From 50d184b6a6b3f259045b5eb5e5405d4982371657 Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Wed, 22 Oct 2025 00:15:20 +0200 Subject: [PATCH] Revert "search: New old search implementation (#39956)" (#40831) This reverts commit 7c4fb5a899c34efaea9e52ddd84daebc8d9ccf49. Closes #40792 Release Notes: - N/A *or* Added/Fixed/Improved ... --- Cargo.lock | 27 - Cargo.toml | 2 - crates/project/src/buffer_store.rs | 71 +- crates/project/src/project.rs | 235 ++++-- crates/project/src/project_search.rs | 754 ------------------- crates/project/src/worktree_store.rs | 151 +++- crates/project_benchmarks/Cargo.toml | 21 - crates/project_benchmarks/LICENSE-GPL | 1 - crates/project_benchmarks/src/main.rs | 136 ---- crates/remote_server/Cargo.toml | 2 +- crates/remote_server/src/headless_project.rs | 12 +- 11 files changed, 389 insertions(+), 1023 deletions(-) delete mode 100644 crates/project/src/project_search.rs delete mode 100644 crates/project_benchmarks/Cargo.toml delete mode 120000 crates/project_benchmarks/LICENSE-GPL delete mode 100644 crates/project_benchmarks/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index d116940e5de7968cba610eaf842c6bcdb334df77..08bffae4bc9c901aa3f12d7df76cf02048f7c51d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12889,23 +12889,6 @@ dependencies = [ "zlog", ] -[[package]] -name = "project_benchmarks" -version = "0.1.0" -dependencies = [ - "anyhow", - "clap", - "client", - "futures 0.3.31", - "gpui", - "http_client", - "language", - "node_runtime", - "project", - "settings", - "watch", -] - [[package]] name = "project_panel" version = "0.1.0" @@ -20606,16 +20589,6 @@ dependencies = [ "zlog", ] -[[package]] -name = "worktree_benchmarks" -version = "0.1.0" -dependencies = [ - "fs", - "gpui", - "settings", - "worktree", -] - [[package]] name = "writeable" version = "0.6.1" diff --git a/Cargo.toml b/Cargo.toml index 9637569566e7d3ebc8035d7a93b6e35d2ef84b70..f5b9a809de680b89c1d989dc8541c4aa308f24bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -126,7 +126,6 @@ members = [ "crates/picker", "crates/prettier", "crates/project", - "crates/project_benchmarks", "crates/project_panel", "crates/project_symbols", "crates/prompt_store", @@ -195,7 +194,6 @@ members = [ "crates/web_search_providers", "crates/workspace", "crates/worktree", - "crates/worktree_benchmarks", "crates/x_ai", "crates/zed", "crates/zed_actions", diff --git a/crates/project/src/buffer_store.rs b/crates/project/src/buffer_store.rs index 51bca611f4b69546cc358cb59724dbb7f98d219e..442cd35dc1b171a1510439f5314d3f543293350f 100644 --- a/crates/project/src/buffer_store.rs +++ b/crates/project/src/buffer_store.rs @@ -1,12 +1,14 @@ use crate::{ - ProjectPath, + ProjectItem as _, ProjectPath, lsp_store::OpenLspBufferHandle, + search::SearchQuery, worktree_store::{WorktreeStore, WorktreeStoreEvent}, }; use anyhow::{Context as _, Result, anyhow}; use client::Client; use collections::{HashMap, HashSet, hash_map}; -use futures::{Future, FutureExt as _, channel::oneshot, future::Shared}; +use fs::Fs; +use futures::{Future, FutureExt as _, StreamExt, channel::oneshot, future::Shared}; use gpui::{ App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity, }; @@ -21,8 +23,8 @@ use rpc::{ AnyProtoClient, ErrorCode, ErrorExt as _, TypedEnvelope, proto::{self}, }; - -use std::{io, sync::Arc, time::Instant}; +use smol::channel::Receiver; +use std::{io, pin::pin, sync::Arc, time::Instant}; use text::{BufferId, ReplicaId}; use util::{ResultExt as _, TryFutureExt, debug_panic, maybe, paths::PathStyle, rel_path::RelPath}; use worktree::{File, PathChange, ProjectEntryId, Worktree, WorktreeId}; @@ -973,10 +975,6 @@ impl BufferStore { .filter_map(|buffer| buffer.upgrade()) } - pub(crate) fn is_searchable(&self, id: &BufferId) -> bool { - !self.non_searchable_buffers.contains(&id) - } - pub fn loading_buffers( &self, ) -> impl Iterator>>)> { @@ -1101,6 +1099,63 @@ impl BufferStore { Some(()) } + pub fn find_search_candidates( + &mut self, + query: &SearchQuery, + mut limit: usize, + fs: Arc, + cx: &mut Context, + ) -> Receiver> { + let (tx, rx) = smol::channel::unbounded(); + let mut open_buffers = HashSet::default(); + let mut unnamed_buffers = Vec::new(); + for handle in self.buffers() { + let buffer = handle.read(cx); + if self.non_searchable_buffers.contains(&buffer.remote_id()) { + continue; + } else if let Some(entry_id) = buffer.entry_id(cx) { + open_buffers.insert(entry_id); + } else { + limit = limit.saturating_sub(1); + unnamed_buffers.push(handle) + }; + } + + const MAX_CONCURRENT_BUFFER_OPENS: usize = 64; + let project_paths_rx = self + .worktree_store + .update(cx, |worktree_store, cx| { + worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx) + }) + .chunks(MAX_CONCURRENT_BUFFER_OPENS); + + cx.spawn(async move |this, cx| { + for buffer in unnamed_buffers { + tx.send(buffer).await.ok(); + } + + let mut project_paths_rx = pin!(project_paths_rx); + while let Some(project_paths) = project_paths_rx.next().await { + let buffers = this.update(cx, |this, cx| { + project_paths + .into_iter() + .map(|project_path| this.open_buffer(project_path, cx)) + .collect::>() + })?; + for buffer_task in buffers { + if let Some(buffer) = buffer_task.await.log_err() + && tx.send(buffer).await.is_err() + { + return anyhow::Ok(()); + } + } + } + anyhow::Ok(()) + }) + .detach(); + rx + } + fn on_buffer_event( &mut self, buffer: Entity, diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 1fd4bcd583908f0428586b988d82808598a501b3..d167434c52abc161f81d92e2a51c992d52fb9872 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -11,7 +11,6 @@ pub mod lsp_command; pub mod lsp_store; mod manifest_tree; pub mod prettier_store; -mod project_search; pub mod project_settings; pub mod search; mod task_inventory; @@ -41,7 +40,6 @@ use crate::{ agent_server_store::AllAgentServersSettings, git_store::GitStore, lsp_store::{SymbolLocation, log_store::LogKind}, - project_search::SearchResultsHandle, }; pub use agent_server_store::{AgentServerStore, AgentServersUpdated}; pub use git_store::{ @@ -49,7 +47,6 @@ pub use git_store::{ git_traversal::{ChildEntriesGitIter, GitEntry, GitEntryRef, GitTraversal}, }; pub use manifest_tree::ManifestTree; -pub use project_search::Search; use anyhow::{Context as _, Result, anyhow}; use buffer_store::{BufferStore, BufferStoreEvent}; @@ -113,7 +110,7 @@ use snippet_provider::SnippetProvider; use std::{ borrow::Cow, collections::BTreeMap, - ops::{Not as _, Range}, + ops::Range, path::{Path, PathBuf}, pin::pin, str, @@ -127,7 +124,7 @@ use text::{Anchor, BufferId, OffsetRangeExt, Point, Rope}; use toolchain_store::EmptyToolchainStore; use util::{ ResultExt as _, maybe, - paths::{PathStyle, SanitizedPath, is_absolute}, + paths::{PathStyle, SanitizedPath, compare_paths, is_absolute}, rel_path::RelPath, }; use worktree::{CreatedEntry, Snapshot, Traversal}; @@ -154,6 +151,8 @@ pub use lsp_store::{ }; pub use toolchain_store::{ToolchainStore, Toolchains}; const MAX_PROJECT_SEARCH_HISTORY_SIZE: usize = 500; +const MAX_SEARCH_RESULT_FILES: usize = 5_000; +const MAX_SEARCH_RESULT_RANGES: usize = 10_000; pub trait ProjectItem: 'static { fn try_open( @@ -3938,44 +3937,179 @@ impl Project { }) } - fn search_impl(&mut self, query: SearchQuery, cx: &mut Context) -> SearchResultsHandle { - let client: Option<(AnyProtoClient, _)> = if let Some(ssh_client) = &self.remote_client { - Some((ssh_client.read(cx).proto_client(), 0)) - } else if let Some(remote_id) = self.remote_id() { - self.is_local() - .not() - .then(|| (self.collab_client.clone().into(), remote_id)) + pub fn search(&mut self, query: SearchQuery, cx: &mut Context) -> Receiver { + let (result_tx, result_rx) = smol::channel::unbounded(); + + let matching_buffers_rx = if query.is_opened_only() { + self.sort_search_candidates(&query, cx) } else { - None + self.find_search_candidate_buffers(&query, MAX_SEARCH_RESULT_FILES + 1, cx) }; - let searcher = if query.is_opened_only() { - project_search::Search::open_buffers_only( - self.buffer_store.clone(), - self.worktree_store.clone(), - project_search::Search::MAX_SEARCH_RESULT_FILES + 1, - ) - } else { - match client { - Some((client, remote_id)) => project_search::Search::remote( - self.buffer_store.clone(), - self.worktree_store.clone(), - project_search::Search::MAX_SEARCH_RESULT_FILES + 1, - (client, remote_id, self.remotely_created_models.clone()), - ), - None => project_search::Search::local( - self.fs.clone(), - self.buffer_store.clone(), - self.worktree_store.clone(), - project_search::Search::MAX_SEARCH_RESULT_FILES + 1, - cx, - ), + + cx.spawn(async move |_, cx| { + let mut range_count = 0; + let mut buffer_count = 0; + let mut limit_reached = false; + let query = Arc::new(query); + let chunks = matching_buffers_rx.ready_chunks(64); + + // Now that we know what paths match the query, we will load at most + // 64 buffers at a time to avoid overwhelming the main thread. For each + // opened buffer, we will spawn a background task that retrieves all the + // ranges in the buffer matched by the query. + let mut chunks = pin!(chunks); + 'outer: while let Some(matching_buffer_chunk) = chunks.next().await { + let mut chunk_results = Vec::with_capacity(matching_buffer_chunk.len()); + for buffer in matching_buffer_chunk { + let query = query.clone(); + let snapshot = buffer.read_with(cx, |buffer, _| buffer.snapshot())?; + chunk_results.push(cx.background_spawn(async move { + let ranges = query + .search(&snapshot, None) + .await + .iter() + .map(|range| { + snapshot.anchor_before(range.start) + ..snapshot.anchor_after(range.end) + }) + .collect::>(); + anyhow::Ok((buffer, ranges)) + })); + } + + let chunk_results = futures::future::join_all(chunk_results).await; + for result in chunk_results { + if let Some((buffer, ranges)) = result.log_err() { + range_count += ranges.len(); + buffer_count += 1; + result_tx + .send(SearchResult::Buffer { buffer, ranges }) + .await?; + if buffer_count > MAX_SEARCH_RESULT_FILES + || range_count > MAX_SEARCH_RESULT_RANGES + { + limit_reached = true; + break 'outer; + } + } + } } - }; - searcher.into_handle(query, cx) + + if limit_reached { + result_tx.send(SearchResult::LimitReached).await?; + } + + anyhow::Ok(()) + }) + .detach(); + + result_rx } - pub fn search(&mut self, query: SearchQuery, cx: &mut Context) -> Receiver { - self.search_impl(query, cx).results(cx) + fn find_search_candidate_buffers( + &mut self, + query: &SearchQuery, + limit: usize, + cx: &mut Context, + ) -> Receiver> { + if self.is_local() { + let fs = self.fs.clone(); + self.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.find_search_candidates(query, limit, fs, cx) + }) + } else { + self.find_search_candidates_remote(query, limit, cx) + } + } + + fn sort_search_candidates( + &mut self, + search_query: &SearchQuery, + cx: &mut Context, + ) -> Receiver> { + let worktree_store = self.worktree_store.read(cx); + let mut buffers = search_query + .buffers() + .into_iter() + .flatten() + .filter(|buffer| { + let b = buffer.read(cx); + if let Some(file) = b.file() { + if !search_query.match_path(file.path().as_std_path()) { + return false; + } + if let Some(entry) = b + .entry_id(cx) + .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx)) + && entry.is_ignored + && !search_query.include_ignored() + { + return false; + } + } + true + }) + .collect::>(); + let (tx, rx) = smol::channel::unbounded(); + buffers.sort_by(|a, b| match (a.read(cx).file(), b.read(cx).file()) { + (None, None) => a.read(cx).remote_id().cmp(&b.read(cx).remote_id()), + (None, Some(_)) => std::cmp::Ordering::Less, + (Some(_), None) => std::cmp::Ordering::Greater, + (Some(a), Some(b)) => compare_paths( + (a.path().as_std_path(), true), + (b.path().as_std_path(), true), + ), + }); + for buffer in buffers { + tx.send_blocking(buffer.clone()).unwrap() + } + + rx + } + + fn find_search_candidates_remote( + &mut self, + query: &SearchQuery, + limit: usize, + cx: &mut Context, + ) -> Receiver> { + let (tx, rx) = smol::channel::unbounded(); + + let (client, remote_id): (AnyProtoClient, _) = if let Some(ssh_client) = &self.remote_client + { + (ssh_client.read(cx).proto_client(), 0) + } else if let Some(remote_id) = self.remote_id() { + (self.collab_client.clone().into(), remote_id) + } else { + return rx; + }; + + let request = client.request(proto::FindSearchCandidates { + project_id: remote_id, + query: Some(query.to_proto()), + limit: limit as _, + }); + let guard = self.retain_remotely_created_models(cx); + + cx.spawn(async move |project, cx| { + let response = request.await?; + for buffer_id in response.buffer_ids { + let buffer_id = BufferId::new(buffer_id)?; + let buffer = project + .update(cx, |project, cx| { + project.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.wait_for_remote_buffer(buffer_id, cx) + }) + })? + .await?; + let _ = tx.send(buffer).await; + } + + drop(guard); + anyhow::Ok(()) + }) + .detach_and_log_err(cx); + rx } pub fn request_lsp( @@ -4700,31 +4834,18 @@ impl Project { fn retain_remotely_created_models( &mut self, cx: &mut Context, - ) -> RemotelyCreatedModelGuard { - Self::retain_remotely_created_models_impl( - &self.remotely_created_models, - &self.buffer_store, - &self.worktree_store, - cx, - ) - } - - fn retain_remotely_created_models_impl( - models: &Arc>, - buffer_store: &Entity, - worktree_store: &Entity, - cx: &mut App, ) -> RemotelyCreatedModelGuard { { - let mut remotely_create_models = models.lock(); + let mut remotely_create_models = self.remotely_created_models.lock(); if remotely_create_models.retain_count == 0 { - remotely_create_models.buffers = buffer_store.read(cx).buffers().collect(); - remotely_create_models.worktrees = worktree_store.read(cx).worktrees().collect(); + remotely_create_models.buffers = self.buffer_store.read(cx).buffers().collect(); + remotely_create_models.worktrees = + self.worktree_store.read(cx).worktrees().collect(); } remotely_create_models.retain_count += 1; } RemotelyCreatedModelGuard { - remote_models: Arc::downgrade(&models), + remote_models: Arc::downgrade(&self.remotely_created_models), } } @@ -4794,7 +4915,7 @@ impl Project { let query = SearchQuery::from_proto(message.query.context("missing query field")?, path_style)?; let results = this.update(&mut cx, |this, cx| { - this.search_impl(query, cx).matching_buffers(cx) + this.find_search_candidate_buffers(&query, message.limit as _, cx) })?; let mut response = proto::FindSearchCandidatesResponse { diff --git a/crates/project/src/project_search.rs b/crates/project/src/project_search.rs deleted file mode 100644 index 25fe578bd7dc2302645dcfb4fd557de1f2b22081..0000000000000000000000000000000000000000 --- a/crates/project/src/project_search.rs +++ /dev/null @@ -1,754 +0,0 @@ -use std::{ - io::{BufRead, BufReader}, - path::Path, - pin::pin, - sync::{ - Arc, - atomic::{AtomicUsize, Ordering}, - }, -}; - -use anyhow::Context; -use collections::HashSet; -use fs::Fs; -use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered}; -use gpui::{App, AppContext, AsyncApp, Entity, Task}; -use language::{Buffer, BufferSnapshot}; -use parking_lot::Mutex; -use postage::oneshot; -use rpc::{AnyProtoClient, proto}; -use smol::{ - channel::{Receiver, Sender, bounded, unbounded}, - future::FutureExt, -}; - -use text::BufferId; -use util::{ResultExt, maybe, paths::compare_rel_paths}; -use worktree::{Entry, ProjectEntryId, Snapshot, Worktree}; - -use crate::{ - Project, ProjectItem, ProjectPath, RemotelyCreatedModels, - buffer_store::BufferStore, - search::{SearchQuery, SearchResult}, - worktree_store::WorktreeStore, -}; - -pub struct Search { - buffer_store: Entity, - worktree_store: Entity, - limit: usize, - kind: SearchKind, -} - -/// Represents search setup, before it is actually kicked off with Search::into_results -enum SearchKind { - /// Search for candidates by inspecting file contents on file system, avoiding loading the buffer unless we know that a given file contains a match. - Local { - fs: Arc, - worktrees: Vec>, - }, - /// Query remote host for candidates. As of writing, the host runs a local search in "buffers with matches only" mode. - Remote { - client: AnyProtoClient, - remote_id: u64, - models: Arc>, - }, - /// Run search against a known set of candidates. Even when working with a remote host, this won't round-trip to host. - OpenBuffersOnly, -} - -/// Represents results of project search and allows one to either obtain match positions OR -/// just the handles to buffers that may match the search. Grabbing the handles is cheaper than obtaining full match positions, because in that case we'll look for -/// at most one match in each file. -#[must_use] -pub struct SearchResultsHandle { - results: Receiver, - matching_buffers: Receiver>, - trigger_search: Box Task<()> + Send + Sync>, -} - -impl SearchResultsHandle { - pub fn results(self, cx: &mut App) -> Receiver { - (self.trigger_search)(cx).detach(); - self.results - } - pub fn matching_buffers(self, cx: &mut App) -> Receiver> { - (self.trigger_search)(cx).detach(); - self.matching_buffers - } -} - -#[derive(Clone)] -enum FindSearchCandidates { - Local { - fs: Arc, - /// Start off with all paths in project and filter them based on: - /// - Include filters - /// - Exclude filters - /// - Only open buffers - /// - Scan ignored files - /// Put another way: filter out files that can't match (without looking at file contents) - input_paths_rx: Receiver, - /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match - /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory. - confirm_contents_will_match_tx: Sender, - confirm_contents_will_match_rx: Receiver, - /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges). - /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it). - get_buffer_for_full_scan_tx: Sender, - }, - Remote, - OpenBuffersOnly, -} - -impl Search { - pub fn local( - fs: Arc, - buffer_store: Entity, - worktree_store: Entity, - limit: usize, - cx: &mut App, - ) -> Self { - let worktrees = worktree_store.read(cx).visible_worktrees(cx).collect(); - Self { - kind: SearchKind::Local { fs, worktrees }, - buffer_store, - worktree_store, - limit, - } - } - - pub(crate) fn remote( - buffer_store: Entity, - worktree_store: Entity, - limit: usize, - client_state: (AnyProtoClient, u64, Arc>), - ) -> Self { - Self { - kind: SearchKind::Remote { - client: client_state.0, - remote_id: client_state.1, - models: client_state.2, - }, - buffer_store, - worktree_store, - limit, - } - } - pub(crate) fn open_buffers_only( - buffer_store: Entity, - worktree_store: Entity, - limit: usize, - ) -> Self { - Self { - kind: SearchKind::OpenBuffersOnly, - buffer_store, - worktree_store, - limit, - } - } - - pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000; - pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000; - /// Prepares a project search run. The resulting [`SearchResultsHandle`] has to be used to specify whether you're interested in matching buffers - /// or full search results. - pub fn into_handle(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle { - let mut open_buffers = HashSet::default(); - let mut unnamed_buffers = Vec::new(); - const MAX_CONCURRENT_BUFFER_OPENS: usize = 64; - let buffers = self.buffer_store.read(cx); - for handle in buffers.buffers() { - let buffer = handle.read(cx); - if !buffers.is_searchable(&buffer.remote_id()) { - continue; - } else if let Some(entry_id) = buffer.entry_id(cx) { - open_buffers.insert(entry_id); - } else { - self.limit -= self.limit.saturating_sub(1); - unnamed_buffers.push(handle) - }; - } - let executor = cx.background_executor().clone(); - let (tx, rx) = unbounded(); - let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded(); - let matching_buffers = grab_buffer_snapshot_rx.clone(); - let trigger_search = Box::new(move |cx: &mut App| { - cx.spawn(async move |cx| { - for buffer in unnamed_buffers { - _ = grab_buffer_snapshot_tx.send(buffer).await; - } - - let (find_all_matches_tx, find_all_matches_rx) = - bounded(MAX_CONCURRENT_BUFFER_OPENS); - - let (candidate_searcher, tasks) = match self.kind { - SearchKind::OpenBuffersOnly => { - let Ok(open_buffers) = cx.update(|cx| self.all_loaded_buffers(&query, cx)) - else { - return; - }; - let fill_requests = cx - .background_spawn(async move { - for buffer in open_buffers { - if let Err(_) = grab_buffer_snapshot_tx.send(buffer).await { - return; - } - } - }) - .boxed_local(); - (FindSearchCandidates::OpenBuffersOnly, vec![fill_requests]) - } - SearchKind::Local { - fs, - ref mut worktrees, - } => { - let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) = - unbounded(); - let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) = - bounded(64); - let (sorted_search_results_tx, sorted_search_results_rx) = unbounded(); - - let (input_paths_tx, input_paths_rx) = unbounded(); - - let tasks = vec![ - cx.spawn(Self::provide_search_paths( - std::mem::take(worktrees), - query.include_ignored(), - input_paths_tx, - sorted_search_results_tx, - )) - .boxed_local(), - Self::open_buffers( - &self.buffer_store, - get_buffer_for_full_scan_rx, - grab_buffer_snapshot_tx, - cx.clone(), - ) - .boxed_local(), - cx.background_spawn(Self::maintain_sorted_search_results( - sorted_search_results_rx, - get_buffer_for_full_scan_tx.clone(), - self.limit, - )) - .boxed_local(), - ]; - ( - FindSearchCandidates::Local { - fs, - get_buffer_for_full_scan_tx, - confirm_contents_will_match_tx, - confirm_contents_will_match_rx, - input_paths_rx, - }, - tasks, - ) - } - SearchKind::Remote { - client, - remote_id, - models, - } => { - let request = client.request(proto::FindSearchCandidates { - project_id: remote_id, - query: Some(query.to_proto()), - limit: self.limit as _, - }); - let Ok(guard) = cx.update(|cx| { - Project::retain_remotely_created_models_impl( - &models, - &self.buffer_store, - &self.worktree_store, - cx, - ) - }) else { - return; - }; - let buffer_store = self.buffer_store.downgrade(); - let issue_remote_buffers_request = cx - .spawn(async move |cx| { - let _ = maybe!(async move { - let response = request.await?; - - for buffer_id in response.buffer_ids { - let buffer_id = BufferId::new(buffer_id)?; - let buffer = buffer_store - .update(cx, |buffer_store, cx| { - buffer_store.wait_for_remote_buffer(buffer_id, cx) - })? - .await?; - let _ = grab_buffer_snapshot_tx.send(buffer).await; - } - - drop(guard); - anyhow::Ok(()) - }) - .await - .log_err(); - }) - .boxed_local(); - ( - FindSearchCandidates::Remote, - vec![issue_remote_buffers_request], - ) - } - }; - - let matches_count = AtomicUsize::new(0); - let matched_buffer_count = AtomicUsize::new(0); - - let worker_pool = executor.scoped(|scope| { - let num_cpus = executor.num_cpus(); - - assert!(num_cpus > 0); - for _ in 0..executor.num_cpus() - 1 { - let worker = Worker { - query: &query, - open_buffers: &open_buffers, - matched_buffer_count: &matched_buffer_count, - matches_count: &matches_count, - candidates: candidate_searcher.clone(), - find_all_matches_rx: find_all_matches_rx.clone(), - publish_matches: tx.clone(), - }; - scope.spawn(worker.run()); - } - drop(tx); - drop(find_all_matches_rx); - drop(candidate_searcher); - }); - - let buffer_snapshots = Self::grab_buffer_snapshots( - grab_buffer_snapshot_rx, - find_all_matches_tx, - cx.clone(), - ); - futures::future::join_all( - [worker_pool.boxed_local(), buffer_snapshots.boxed_local()] - .into_iter() - .chain(tasks), - ) - .await; - }) - }); - - SearchResultsHandle { - results: rx, - matching_buffers, - trigger_search, - } - } - - fn provide_search_paths( - worktrees: Vec>, - include_ignored: bool, - tx: Sender, - results: Sender>, - ) -> impl AsyncFnOnce(&mut AsyncApp) { - async move |cx| { - _ = maybe!(async move { - for worktree in worktrees { - let (mut snapshot, worktree_settings) = worktree - .read_with(cx, |this, _| { - Some((this.snapshot(), this.as_local()?.settings())) - })? - .context("The worktree is not local")?; - if include_ignored { - // Pre-fetch all of the ignored directories as they're going to be searched. - let mut entries_to_refresh = vec![]; - for entry in snapshot.entries(include_ignored, 0) { - if entry.is_ignored && entry.kind.is_unloaded() { - if !worktree_settings.is_path_excluded(&entry.path) { - entries_to_refresh.push(entry.path.clone()); - } - } - } - let barrier = worktree.update(cx, |this, _| { - let local = this.as_local_mut()?; - let barrier = entries_to_refresh - .into_iter() - .map(|path| local.add_path_prefix_to_scan(path).into_future()) - .collect::>(); - Some(barrier) - })?; - if let Some(barriers) = barrier { - futures::future::join_all(barriers).await; - } - snapshot = worktree.read_with(cx, |this, _| this.snapshot())?; - } - cx.background_executor() - .scoped(|scope| { - scope.spawn(async { - for entry in snapshot.files(include_ignored, 0) { - let (should_scan_tx, should_scan_rx) = oneshot::channel(); - let Ok(_) = tx - .send(InputPath { - entry: entry.clone(), - snapshot: snapshot.clone(), - should_scan_tx, - }) - .await - else { - return; - }; - if results.send(should_scan_rx).await.is_err() { - return; - }; - } - }) - }) - .await; - } - anyhow::Ok(()) - }) - .await; - } - } - - async fn maintain_sorted_search_results( - rx: Receiver>, - paths_for_full_scan: Sender, - limit: usize, - ) { - let mut rx = pin!(rx); - let mut matched = 0; - while let Some(mut next_path_result) = rx.next().await { - let Some(successful_path) = next_path_result.next().await else { - // This math did not produce a match, hence skip it. - continue; - }; - if paths_for_full_scan.send(successful_path).await.is_err() { - return; - }; - matched += 1; - if matched >= limit { - break; - } - } - } - - /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf. - async fn open_buffers( - buffer_store: &Entity, - rx: Receiver, - find_all_matches_tx: Sender>, - mut cx: AsyncApp, - ) { - let mut rx = pin!(rx.ready_chunks(64)); - _ = maybe!(async move { - while let Some(requested_paths) = rx.next().await { - let mut buffers = buffer_store.update(&mut cx, |this, cx| { - requested_paths - .into_iter() - .map(|path| this.open_buffer(path, cx)) - .collect::>() - })?; - - while let Some(buffer) = buffers.next().await { - if let Some(buffer) = buffer.log_err() { - find_all_matches_tx.send(buffer).await?; - } - } - } - Result::<_, anyhow::Error>::Ok(()) - }) - .await; - } - - async fn grab_buffer_snapshots( - rx: Receiver>, - find_all_matches_tx: Sender<(Entity, BufferSnapshot)>, - mut cx: AsyncApp, - ) { - _ = maybe!(async move { - while let Ok(buffer) = rx.recv().await { - let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?; - find_all_matches_tx.send((buffer, snapshot)).await?; - } - Result::<_, anyhow::Error>::Ok(()) - }) - .await; - } - - fn all_loaded_buffers(&self, search_query: &SearchQuery, cx: &App) -> Vec> { - let worktree_store = self.worktree_store.read(cx); - let mut buffers = search_query - .buffers() - .into_iter() - .flatten() - .filter(|buffer| { - let b = buffer.read(cx); - if let Some(file) = b.file() { - if !search_query.match_path(file.path().as_std_path()) { - return false; - } - if !search_query.include_ignored() - && let Some(entry) = b - .entry_id(cx) - .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx)) - && entry.is_ignored - { - return false; - } - } - true - }) - .cloned() - .collect::>(); - buffers.sort_by(|a, b| { - let a = a.read(cx); - let b = b.read(cx); - match (a.file(), b.file()) { - (None, None) => a.remote_id().cmp(&b.remote_id()), - (None, Some(_)) => std::cmp::Ordering::Less, - (Some(_), None) => std::cmp::Ordering::Greater, - (Some(a), Some(b)) => compare_rel_paths((a.path(), true), (b.path(), true)), - } - }); - - buffers - } -} - -struct Worker<'search> { - query: &'search SearchQuery, - matched_buffer_count: &'search AtomicUsize, - matches_count: &'search AtomicUsize, - open_buffers: &'search HashSet, - candidates: FindSearchCandidates, - /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot. - find_all_matches_rx: Receiver<(Entity, BufferSnapshot)>, - /// Cool, we have results; let's share them with the world. - publish_matches: Sender, -} - -impl Worker<'_> { - async fn run(mut self) { - let ( - input_paths_rx, - confirm_contents_will_match_rx, - mut confirm_contents_will_match_tx, - mut get_buffer_for_full_scan_tx, - fs, - ) = match self.candidates { - FindSearchCandidates::Local { - fs, - input_paths_rx, - confirm_contents_will_match_rx, - confirm_contents_will_match_tx, - get_buffer_for_full_scan_tx, - } => ( - input_paths_rx, - confirm_contents_will_match_rx, - confirm_contents_will_match_tx, - get_buffer_for_full_scan_tx, - Some(fs), - ), - FindSearchCandidates::Remote | FindSearchCandidates::OpenBuffersOnly => ( - unbounded().1, - unbounded().1, - unbounded().0, - unbounded().0, - None, - ), - }; - let mut find_all_matches = pin!(self.find_all_matches_rx.fuse()); - let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse()); - let mut scan_path = pin!(input_paths_rx.fuse()); - - loop { - let handler = RequestHandler { - query: self.query, - open_entries: &self.open_buffers, - fs: fs.as_deref(), - matched_buffer_count: self.matched_buffer_count, - matches_count: self.matches_count, - confirm_contents_will_match_tx: &confirm_contents_will_match_tx, - get_buffer_for_full_scan_tx: &get_buffer_for_full_scan_tx, - publish_matches: &self.publish_matches, - }; - // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent - // steps straight away. Another worker might be about to produce a value that will - // be pushed there, thus we'll replace current worker's pipe with a dummy one. - // That way, we'll only ever close a next-stage channel when ALL workers do so. - select_biased! { - find_all_matches = find_all_matches.next() => { - - if self.publish_matches.is_closed() { - break; - } - let Some(matches) = find_all_matches else { - self.publish_matches = bounded(1).0; - continue; - }; - let result = handler.handle_find_all_matches(matches).await; - if let Some(_should_bail) = result { - - self.publish_matches = bounded(1).0; - continue; - } - }, - find_first_match = find_first_match.next() => { - if let Some(buffer_with_at_least_one_match) = find_first_match { - handler.handle_find_first_match(buffer_with_at_least_one_match).await; - } else { - get_buffer_for_full_scan_tx = bounded(1).0; - } - - }, - scan_path = scan_path.next() => { - if let Some(path_to_scan) = scan_path { - handler.handle_scan_path(path_to_scan).await; - } else { - // If we're the last worker to notice that this is not producing values, close the upstream. - confirm_contents_will_match_tx = bounded(1).0; - } - - } - complete => { - break - }, - - } - } - } -} - -struct RequestHandler<'worker> { - query: &'worker SearchQuery, - fs: Option<&'worker dyn Fs>, - open_entries: &'worker HashSet, - matched_buffer_count: &'worker AtomicUsize, - matches_count: &'worker AtomicUsize, - - confirm_contents_will_match_tx: &'worker Sender, - get_buffer_for_full_scan_tx: &'worker Sender, - publish_matches: &'worker Sender, -} - -struct LimitReached; - -impl RequestHandler<'_> { - async fn handle_find_all_matches( - &self, - (buffer, snapshot): (Entity, BufferSnapshot), - ) -> Option { - let ranges = self - .query - .search(&snapshot, None) - .await - .iter() - .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end)) - .collect::>(); - - let matched_ranges = ranges.len(); - if self.matched_buffer_count.fetch_add(1, Ordering::Release) - > Search::MAX_SEARCH_RESULT_FILES - || self - .matches_count - .fetch_add(matched_ranges, Ordering::Release) - > Search::MAX_SEARCH_RESULT_RANGES - { - _ = self.publish_matches.send(SearchResult::LimitReached).await; - Some(LimitReached) - } else { - _ = self - .publish_matches - .send(SearchResult::Buffer { buffer, ranges }) - .await; - None - } - } - async fn handle_find_first_match(&self, mut entry: MatchingEntry) { - _=maybe!(async move { - let abs_path = entry.worktree_root.join(entry.path.path.as_std_path()); - let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else { - return anyhow::Ok(()); - }; - - let mut file = BufReader::new(file); - let file_start = file.fill_buf()?; - - if let Err(Some(starting_position)) = - std::str::from_utf8(file_start).map_err(|e| e.error_len()) - { - // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on; - // That way we can still match files in a streaming fashion without having look at "obviously binary" files. - log::debug!( - "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}" - ); - return Ok(()); - } - - if self.query.detect(file).unwrap_or(false) { - // Yes, we should scan the whole file. - entry.should_scan_tx.send(entry.path).await?; - } - Ok(()) - }).await; - } - - async fn handle_scan_path(&self, req: InputPath) { - _ = maybe!(async move { - let InputPath { - entry, - - snapshot, - should_scan_tx, - } = req; - - if entry.is_fifo || !entry.is_file() { - return Ok(()); - } - - if self.query.filters_path() { - let matched_path = if self.query.match_full_paths() { - let mut full_path = snapshot.root_name().as_std_path().to_owned(); - full_path.push(entry.path.as_std_path()); - self.query.match_path(&full_path) - } else { - self.query.match_path(entry.path.as_std_path()) - }; - if !matched_path { - return Ok(()); - } - } - - if self.open_entries.contains(&entry.id) { - // The buffer is already in memory and that's the version we want to scan; - // hence skip the dilly-dally and look for all matches straight away. - self.get_buffer_for_full_scan_tx - .send(ProjectPath { - worktree_id: snapshot.id(), - path: entry.path.clone(), - }) - .await?; - } else { - self.confirm_contents_will_match_tx - .send(MatchingEntry { - should_scan_tx: should_scan_tx, - worktree_root: snapshot.abs_path().clone(), - path: ProjectPath { - worktree_id: snapshot.id(), - path: entry.path.clone(), - }, - }) - .await?; - } - - anyhow::Ok(()) - }) - .await; - } -} - -struct InputPath { - entry: Entry, - snapshot: Snapshot, - should_scan_tx: oneshot::Sender, -} - -struct MatchingEntry { - worktree_root: Arc, - path: ProjectPath, - should_scan_tx: oneshot::Sender, -} diff --git a/crates/project/src/worktree_store.rs b/crates/project/src/worktree_store.rs index 676c96f4331d73b87d4bc16766a5f6c4d6194864..e6da207dadbde3ebc725fbb84ed19b3b35414f87 100644 --- a/crates/project/src/worktree_store.rs +++ b/crates/project/src/worktree_store.rs @@ -8,7 +8,10 @@ use std::{ use anyhow::{Context as _, Result, anyhow, bail}; use collections::{HashMap, HashSet}; use fs::{Fs, copy_recursive}; -use futures::{FutureExt, SinkExt, future::Shared}; +use futures::{ + FutureExt, SinkExt, + future::{BoxFuture, Shared}, +}; use gpui::{ App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity, }; @@ -996,14 +999,148 @@ impl WorktreeStore { matching_paths_rx } + fn scan_ignored_dir<'a>( + fs: &'a Arc, + snapshot: &'a worktree::Snapshot, + path: &'a RelPath, + query: &'a SearchQuery, + filter_tx: &'a Sender, + output_tx: &'a Sender>, + ) -> BoxFuture<'a, Result<()>> { + async move { + let abs_path = snapshot.absolutize(path); + let Some(mut files) = fs + .read_dir(&abs_path) + .await + .with_context(|| format!("listing ignored path {abs_path:?}")) + .log_err() + else { + return Ok(()); + }; + + let mut results = Vec::new(); + + while let Some(Ok(file)) = files.next().await { + let Some(metadata) = fs + .metadata(&file) + .await + .with_context(|| format!("fetching fs metadata for {abs_path:?}")) + .log_err() + .flatten() + else { + continue; + }; + if metadata.is_symlink || metadata.is_fifo { + continue; + } + let relative_path = file.strip_prefix(snapshot.abs_path())?; + let relative_path = RelPath::new(&relative_path, snapshot.path_style()) + .context("getting relative path")?; + results.push((relative_path.into_arc(), !metadata.is_dir)) + } + results.sort_by(|(a_path, _), (b_path, _)| a_path.cmp(b_path)); + for (path, is_file) in results { + if is_file { + if query.filters_path() { + let matched_path = if query.match_full_paths() { + let mut full_path = snapshot.root_name().as_std_path().to_owned(); + full_path.push(path.as_std_path()); + query.match_path(&full_path) + } else { + query.match_path(&path.as_std_path()) + }; + if !matched_path { + continue; + } + } + let (tx, rx) = oneshot::channel(); + output_tx.send(rx).await?; + filter_tx + .send(MatchingEntry { + respond: tx, + worktree_root: snapshot.abs_path().clone(), + path: ProjectPath { + worktree_id: snapshot.id(), + path: path.into_arc(), + }, + }) + .await?; + } else { + Self::scan_ignored_dir(fs, snapshot, &path, query, filter_tx, output_tx) + .await?; + } + } + Ok(()) + } + .boxed() + } + async fn find_candidate_paths( - _: Arc, - _: Vec<(worktree::Snapshot, WorktreeSettings)>, - _: HashSet, - _: SearchQuery, - _: Sender, - _: Sender>, + fs: Arc, + snapshots: Vec<(worktree::Snapshot, WorktreeSettings)>, + open_entries: HashSet, + query: SearchQuery, + filter_tx: Sender, + output_tx: Sender>, ) -> Result<()> { + for (snapshot, settings) in snapshots { + for entry in snapshot.entries(query.include_ignored(), 0) { + if entry.is_dir() && entry.is_ignored { + if !settings.is_path_excluded(&entry.path) { + Self::scan_ignored_dir( + &fs, + &snapshot, + &entry.path, + &query, + &filter_tx, + &output_tx, + ) + .await?; + } + continue; + } + + if entry.is_fifo || !entry.is_file() { + continue; + } + + if query.filters_path() { + let matched_path = if query.match_full_paths() { + let mut full_path = snapshot.root_name().as_std_path().to_owned(); + full_path.push(entry.path.as_std_path()); + query.match_path(&full_path) + } else { + query.match_path(entry.path.as_std_path()) + }; + if !matched_path { + continue; + } + } + + let (mut tx, rx) = oneshot::channel(); + + if open_entries.contains(&entry.id) { + tx.send(ProjectPath { + worktree_id: snapshot.id(), + path: entry.path.clone(), + }) + .await?; + } else { + filter_tx + .send(MatchingEntry { + respond: tx, + worktree_root: snapshot.abs_path().clone(), + path: ProjectPath { + worktree_id: snapshot.id(), + path: entry.path.clone(), + }, + }) + .await?; + } + + output_tx.send(rx).await?; + } + } Ok(()) } diff --git a/crates/project_benchmarks/Cargo.toml b/crates/project_benchmarks/Cargo.toml deleted file mode 100644 index 1171d468c649bdd9f76a44b3ef0155dc652c6034..0000000000000000000000000000000000000000 --- a/crates/project_benchmarks/Cargo.toml +++ /dev/null @@ -1,21 +0,0 @@ -[package] -name = "project_benchmarks" -version = "0.1.0" -publish.workspace = true -edition.workspace = true - -[dependencies] -anyhow.workspace = true -clap.workspace = true -client.workspace = true -futures.workspace = true -gpui = { workspace = true, features = ["windows-manifest"] } -http_client = { workspace = true, features = ["test-support"]} -language.workspace = true -node_runtime.workspace = true -project.workspace = true -settings.workspace = true -watch.workspace = true - -[lints] -workspace = true diff --git a/crates/project_benchmarks/LICENSE-GPL b/crates/project_benchmarks/LICENSE-GPL deleted file mode 120000 index 89e542f750cd3860a0598eff0dc34b56d7336dc4..0000000000000000000000000000000000000000 --- a/crates/project_benchmarks/LICENSE-GPL +++ /dev/null @@ -1 +0,0 @@ -../../LICENSE-GPL \ No newline at end of file diff --git a/crates/project_benchmarks/src/main.rs b/crates/project_benchmarks/src/main.rs deleted file mode 100644 index 5075016665a072f172da461cffdf6c5dbcabb4ac..0000000000000000000000000000000000000000 --- a/crates/project_benchmarks/src/main.rs +++ /dev/null @@ -1,136 +0,0 @@ -use std::sync::Arc; - -use clap::Parser; -use client::{Client, UserStore}; -use gpui::{AppContext as _, Application}; -use http_client::FakeHttpClient; -use language::LanguageRegistry; -use node_runtime::NodeRuntime; -use project::{ - Project, RealFs, - search::{SearchQuery, SearchResult}, -}; - -#[derive(Parser)] -struct Args { - /// List of worktrees to run the search against. - worktrees: Vec, - #[clap(short)] - query: String, - /// Treat query as a regex. - #[clap(short, long)] - regex: bool, - /// Matches have to be standalone words. - #[clap(long)] - whole_word: bool, - /// Make matching case-sensitive. - #[clap(long, default_value_t = true)] - case_sensitive: bool, - /// Include gitignored files in the search. - #[clap(long)] - include_ignored: bool, -} - -fn main() -> Result<(), anyhow::Error> { - let args = Args::parse(); - let query = if args.regex { - SearchQuery::regex( - args.query, - args.whole_word, - args.case_sensitive, - args.include_ignored, - false, - Default::default(), - Default::default(), - false, - None, - ) - } else { - SearchQuery::text( - args.query, - args.whole_word, - args.case_sensitive, - args.include_ignored, - Default::default(), - Default::default(), - false, - None, - ) - }?; - Application::headless().run(|cx| { - settings::init(cx); - client::init_settings(cx); - language::init(cx); - Project::init_settings(cx); - let client = Client::production(cx); - let http_client = FakeHttpClient::with_200_response(); - let (_, rx) = watch::channel(None); - let node = NodeRuntime::new(http_client, None, rx); - let user_store = cx.new(|cx| UserStore::new(client.clone(), cx)); - let registry = Arc::new(LanguageRegistry::new(cx.background_executor().clone())); - let fs = Arc::new(RealFs::new(None, cx.background_executor().clone())); - let project = Project::local( - client, - node, - user_store, - registry, - fs, - Some(Default::default()), - cx, - ); - - project.clone().update(cx, move |_, cx| { - cx.spawn(async move |_, cx| { - println!("Loading worktrees"); - let worktrees = project.update(cx, |this, cx| { - args.worktrees - .into_iter() - .map(|worktree| this.find_or_create_worktree(worktree, true, cx)) - .collect::>() - })?; - - let worktrees = futures::future::join_all(worktrees) - .await - .into_iter() - .collect::, anyhow::Error>>()?; - - for (worktree, _) in &worktrees { - worktree - .update(cx, |this, _| this.as_local().unwrap().scan_complete())? - .await; - } - println!("Worktrees loaded"); - - println!("Starting a project search"); - let timer = std::time::Instant::now(); - let mut first_match = None; - let matches = project - .update(cx, |this, cx| this.search(query, cx)) - .unwrap(); - let mut matched_files = 0; - let mut matched_chunks = 0; - while let Ok(match_result) = matches.recv().await { - if first_match.is_none() { - let time = timer.elapsed(); - first_match = Some(time); - println!("First match found after {time:?}"); - } - if let SearchResult::Buffer { ranges, .. } = match_result { - matched_files += 1; - matched_chunks += ranges.len(); - } - } - let elapsed = timer.elapsed(); - println!( - "Finished project search after {elapsed:?}. Matched {matched_files} files and {matched_chunks} excerpts" - ); - drop(project); - cx.update(|cx| cx.quit())?; - - anyhow::Ok(()) - }) - .detach(); - }); - }); - Ok(()) -} diff --git a/crates/remote_server/Cargo.toml b/crates/remote_server/Cargo.toml index b702c75119af9f49707c079a3a799ae8c59209b1..3d28f6ba565330a5fc3c0ea0249aaf760c880439 100644 --- a/crates/remote_server/Cargo.toml +++ b/crates/remote_server/Cargo.toml @@ -75,7 +75,7 @@ minidumper.workspace = true [dev-dependencies] action_log.workspace = true -agent = { workspace = true, features = ["test-support"] } +agent.workspace = true client = { workspace = true, features = ["test-support"] } clock = { workspace = true, features = ["test-support"] } collections.workspace = true diff --git a/crates/remote_server/src/headless_project.rs b/crates/remote_server/src/headless_project.rs index caec0f5c1a4829ad0117469d705567ccf557ef46..534eae6f44986afa42b6d202e4f34691935b3b33 100644 --- a/crates/remote_server/src/headless_project.rs +++ b/crates/remote_server/src/headless_project.rs @@ -640,15 +640,9 @@ impl HeadlessProject { PathStyle::local(), )?; let results = this.update(&mut cx, |this, cx| { - project::Search::local( - this.fs.clone(), - this.buffer_store.clone(), - this.worktree_store.clone(), - message.limit as _, - cx, - ) - .into_handle(query, cx) - .matching_buffers(cx) + this.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx) + }) })?; let mut response = proto::FindSearchCandidatesResponse {