1use std::{
2 cell::LazyCell,
3 collections::BTreeSet,
4 io::{BufRead, BufReader},
5 ops::Range,
6 path::{Path, PathBuf},
7 pin::pin,
8 sync::Arc,
9 time::Duration,
10};
11
12use anyhow::Context;
13use collections::HashSet;
14use fs::Fs;
15use futures::FutureExt as _;
16use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
17use gpui::{App, AppContext, AsyncApp, BackgroundExecutor, Entity, Priority, Task};
18use language::{Buffer, BufferSnapshot};
19use parking_lot::Mutex;
20use postage::oneshot;
21use rpc::{AnyProtoClient, proto};
22use smol::channel::{Receiver, Sender, bounded, unbounded};
23
24use util::{ResultExt, maybe, paths::compare_rel_paths, rel_path::RelPath};
25use worktree::{Entry, ProjectEntryId, Snapshot, Worktree, WorktreeSettings};
26
27use crate::{
28 Project, ProjectItem, ProjectPath, RemotelyCreatedModels,
29 buffer_store::BufferStore,
30 search::{SearchQuery, SearchResult},
31 worktree_store::WorktreeStore,
32};
33
34pub struct Search {
35 buffer_store: Entity<BufferStore>,
36 worktree_store: Entity<WorktreeStore>,
37 limit: usize,
38 kind: SearchKind,
39}
40
41/// Represents search setup, before it is actually kicked off with Search::into_results
42enum SearchKind {
43 /// Search for candidates by inspecting file contents on file system, avoiding loading the buffer unless we know that a given file contains a match.
44 Local {
45 fs: Arc<dyn Fs>,
46 worktrees: Vec<Entity<Worktree>>,
47 },
48 /// Query remote host for candidates. As of writing, the host runs a local search in "buffers with matches only" mode.
49 Remote {
50 client: AnyProtoClient,
51 remote_id: u64,
52 models: Arc<Mutex<RemotelyCreatedModels>>,
53 },
54 /// Run search against a known set of candidates. Even when working with a remote host, this won't round-trip to host.
55 OpenBuffersOnly,
56}
57
58/// Represents results of project search and allows one to either obtain match positions OR
59/// just the handles to buffers that may match the search. Grabbing the handles is cheaper than obtaining full match positions, because in that case we'll look for
60/// at most one match in each file.
61#[must_use]
62pub struct SearchResultsHandle {
63 results: Receiver<SearchResult>,
64 matching_buffers: Receiver<Entity<Buffer>>,
65 trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
66}
67
68pub struct SearchResults<T> {
69 pub _task_handle: Task<()>,
70 pub rx: Receiver<T>,
71}
72impl SearchResultsHandle {
73 pub fn results(self, cx: &mut App) -> SearchResults<SearchResult> {
74 SearchResults {
75 _task_handle: (self.trigger_search)(cx),
76 rx: self.results,
77 }
78 }
79 pub fn matching_buffers(self, cx: &mut App) -> SearchResults<Entity<Buffer>> {
80 SearchResults {
81 _task_handle: (self.trigger_search)(cx),
82 rx: self.matching_buffers,
83 }
84 }
85}
86
87#[derive(Clone)]
88enum FindSearchCandidates {
89 Local {
90 fs: Arc<dyn Fs>,
91 /// Start off with all paths in project and filter them based on:
92 /// - Include filters
93 /// - Exclude filters
94 /// - Only open buffers
95 /// - Scan ignored files
96 /// Put another way: filter out files that can't match (without looking at file contents)
97 input_paths_rx: Receiver<InputPath>,
98 /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
99 /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
100 confirm_contents_will_match_tx: Sender<MatchingEntry>,
101 confirm_contents_will_match_rx: Receiver<MatchingEntry>,
102 },
103 Remote,
104 OpenBuffersOnly,
105}
106
107impl Search {
108 pub fn local(
109 fs: Arc<dyn Fs>,
110 buffer_store: Entity<BufferStore>,
111 worktree_store: Entity<WorktreeStore>,
112 limit: usize,
113 cx: &mut App,
114 ) -> Self {
115 let worktrees = worktree_store.read(cx).visible_worktrees(cx).collect();
116 Self {
117 kind: SearchKind::Local { fs, worktrees },
118 buffer_store,
119 worktree_store,
120 limit,
121 }
122 }
123
124 pub(crate) fn remote(
125 buffer_store: Entity<BufferStore>,
126 worktree_store: Entity<WorktreeStore>,
127 limit: usize,
128 client_state: (AnyProtoClient, u64, Arc<Mutex<RemotelyCreatedModels>>),
129 ) -> Self {
130 Self {
131 kind: SearchKind::Remote {
132 client: client_state.0,
133 remote_id: client_state.1,
134 models: client_state.2,
135 },
136 buffer_store,
137 worktree_store,
138 limit,
139 }
140 }
141 pub(crate) fn open_buffers_only(
142 buffer_store: Entity<BufferStore>,
143 worktree_store: Entity<WorktreeStore>,
144 limit: usize,
145 ) -> Self {
146 Self {
147 kind: SearchKind::OpenBuffersOnly,
148 buffer_store,
149 worktree_store,
150 limit,
151 }
152 }
153
154 pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000;
155 pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
156 /// Prepares a project search run. The resulting [`SearchResultsHandle`] has to be used to specify whether you're interested in matching buffers
157 /// or full search results.
158 pub fn into_handle(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
159 let mut open_buffers = HashSet::default();
160 let mut unnamed_buffers = Vec::new();
161 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
162 let buffers = self.buffer_store.read(cx);
163 for handle in buffers.buffers() {
164 let buffer = handle.read(cx);
165 if !buffers.is_searchable(&buffer.remote_id()) {
166 continue;
167 } else if let Some(entry_id) = buffer.entry_id(cx) {
168 open_buffers.insert(entry_id);
169 } else {
170 self.limit = self.limit.saturating_sub(1);
171 unnamed_buffers.push(handle)
172 };
173 }
174 let open_buffers = Arc::new(open_buffers);
175 let executor = cx.background_executor().clone();
176 let (tx, rx) = unbounded();
177 let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
178 let matching_buffers = grab_buffer_snapshot_rx.clone();
179 let trigger_search = Box::new(move |cx: &mut App| {
180 cx.spawn(async move |cx| {
181 for buffer in unnamed_buffers {
182 _ = grab_buffer_snapshot_tx.send(buffer).await;
183 }
184
185 let (find_all_matches_tx, find_all_matches_rx) =
186 bounded(MAX_CONCURRENT_BUFFER_OPENS);
187 let query = Arc::new(query);
188 let (candidate_searcher, tasks) = match self.kind {
189 SearchKind::OpenBuffersOnly => {
190 let open_buffers = cx.update(|cx| self.all_loaded_buffers(&query, cx));
191 let fill_requests = cx
192 .background_spawn(async move {
193 for buffer in open_buffers {
194 if let Err(_) = grab_buffer_snapshot_tx.send(buffer).await {
195 return;
196 }
197 }
198 })
199 .boxed_local();
200 (FindSearchCandidates::OpenBuffersOnly, vec![fill_requests])
201 }
202 SearchKind::Local {
203 fs,
204 ref mut worktrees,
205 } => {
206 let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
207 unbounded();
208 let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
209 bounded(64);
210 let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
211
212 let (input_paths_tx, input_paths_rx) = unbounded();
213 let tasks = vec![
214 cx.spawn(Self::provide_search_paths(
215 std::mem::take(worktrees),
216 query.clone(),
217 input_paths_tx,
218 sorted_search_results_tx,
219 ))
220 .boxed_local(),
221 Self::open_buffers(
222 self.buffer_store,
223 get_buffer_for_full_scan_rx,
224 grab_buffer_snapshot_tx,
225 cx.clone(),
226 )
227 .boxed_local(),
228 cx.background_spawn(Self::maintain_sorted_search_results(
229 sorted_search_results_rx,
230 get_buffer_for_full_scan_tx,
231 self.limit,
232 ))
233 .boxed_local(),
234 ];
235 (
236 FindSearchCandidates::Local {
237 fs,
238 confirm_contents_will_match_tx,
239 confirm_contents_will_match_rx,
240 input_paths_rx,
241 },
242 tasks,
243 )
244 }
245 SearchKind::Remote {
246 client,
247 remote_id,
248 models,
249 } => {
250 let (handle, rx) = self
251 .buffer_store
252 .update(cx, |this, _| this.register_project_search_result_handle());
253
254 let cancel_ongoing_search = util::defer({
255 let client = client.clone();
256 move || {
257 _ = client.send(proto::FindSearchCandidatesCancelled {
258 project_id: remote_id,
259 handle,
260 });
261 }
262 });
263 let request = client.request(proto::FindSearchCandidates {
264 project_id: remote_id,
265 query: Some(query.to_proto()),
266 limit: self.limit as _,
267 handle,
268 });
269
270 let buffer_store = self.buffer_store;
271 let guard = cx.update(|cx| {
272 Project::retain_remotely_created_models_impl(
273 &models,
274 &buffer_store,
275 &self.worktree_store,
276 cx,
277 )
278 });
279
280 let issue_remote_buffers_request = cx
281 .spawn(async move |cx| {
282 let _ = maybe!(async move {
283 request.await?;
284
285 let (buffer_tx, buffer_rx) = bounded(24);
286
287 let wait_for_remote_buffers = cx.spawn(async move |cx| {
288 while let Ok(buffer_id) = rx.recv().await {
289 let buffer =
290 buffer_store.update(cx, |buffer_store, cx| {
291 buffer_store
292 .wait_for_remote_buffer(buffer_id, cx)
293 });
294 buffer_tx.send(buffer).await?;
295 }
296 anyhow::Ok(())
297 });
298
299 let forward_buffers = cx.background_spawn(async move {
300 while let Ok(buffer) = buffer_rx.recv().await {
301 let _ =
302 grab_buffer_snapshot_tx.send(buffer.await?).await;
303 }
304 anyhow::Ok(())
305 });
306 let (left, right) = futures::future::join(
307 wait_for_remote_buffers,
308 forward_buffers,
309 )
310 .await;
311 left?;
312 right?;
313
314 drop(guard);
315 cancel_ongoing_search.abort();
316 anyhow::Ok(())
317 })
318 .await
319 .log_err();
320 })
321 .boxed_local();
322 (
323 FindSearchCandidates::Remote,
324 vec![issue_remote_buffers_request],
325 )
326 }
327 };
328
329 let should_find_all_matches = !tx.is_closed();
330
331 let _executor = executor.clone();
332 let worker_pool = executor.spawn(async move {
333 let num_cpus = _executor.num_cpus();
334
335 assert!(num_cpus > 0);
336 _executor
337 .scoped(|scope| {
338 for _ in 0..num_cpus - 1 {
339 let worker = Worker {
340 query: query.clone(),
341 open_buffers: open_buffers.clone(),
342 candidates: candidate_searcher.clone(),
343 find_all_matches_rx: find_all_matches_rx.clone(),
344 };
345 scope.spawn(worker.run());
346 }
347
348 drop(find_all_matches_rx);
349 drop(candidate_searcher);
350 })
351 .await;
352 });
353
354 let (sorted_matches_tx, sorted_matches_rx) = unbounded();
355 // The caller of `into_handle` decides whether they're interested in all matches (files that matched + all matching ranges) or
356 // just the files. *They are using the same stream as the guts of the project search do*.
357 // This means that we cannot grab values off of that stream unless it's strictly needed for making a progress in project search.
358 //
359 // Grabbing buffer snapshots is only necessary when we're looking for all matches. If the caller decided that they're not interested
360 // in all matches, running that task unconditionally would hinder caller's ability to observe all matching file paths.
361 let buffer_snapshots = if should_find_all_matches {
362 Some(
363 Self::grab_buffer_snapshots(
364 grab_buffer_snapshot_rx,
365 find_all_matches_tx,
366 sorted_matches_tx,
367 cx.clone(),
368 )
369 .boxed_local(),
370 )
371 } else {
372 drop(find_all_matches_tx);
373
374 None
375 };
376 let ensure_matches_are_reported_in_order = if should_find_all_matches {
377 Some(
378 Self::ensure_matched_ranges_are_reported_in_order(sorted_matches_rx, tx)
379 .boxed_local(),
380 )
381 } else {
382 drop(tx);
383 None
384 };
385
386 futures::future::join_all(
387 [worker_pool.boxed_local()]
388 .into_iter()
389 .chain(buffer_snapshots)
390 .chain(ensure_matches_are_reported_in_order)
391 .chain(tasks),
392 )
393 .await;
394 })
395 });
396
397 SearchResultsHandle {
398 results: rx,
399 matching_buffers,
400 trigger_search,
401 }
402 }
403
404 fn provide_search_paths(
405 worktrees: Vec<Entity<Worktree>>,
406 query: Arc<SearchQuery>,
407 tx: Sender<InputPath>,
408 results: Sender<oneshot::Receiver<ProjectPath>>,
409 ) -> impl AsyncFnOnce(&mut AsyncApp) {
410 async move |cx| {
411 _ = maybe!(async move {
412 let gitignored_tracker = PathInclusionMatcher::new(query.clone());
413 let include_ignored = query.include_ignored();
414 for worktree in worktrees {
415 let (mut snapshot, worktree_settings) = worktree
416 .read_with(cx, |this, _| {
417 Some((this.snapshot(), this.as_local()?.settings()))
418 })
419 .context("The worktree is not local")?;
420 if query.include_ignored() {
421 // Pre-fetch all of the ignored directories as they're going to be searched.
422 let mut entries_to_refresh = vec![];
423
424 for entry in snapshot.entries(query.include_ignored(), 0) {
425 if gitignored_tracker.should_scan_gitignored_dir(
426 entry,
427 &snapshot,
428 &worktree_settings,
429 ) {
430 entries_to_refresh.push(entry.path.clone());
431 }
432 }
433 let barrier = worktree.update(cx, |this, _| {
434 let local = this.as_local_mut()?;
435 let barrier = entries_to_refresh
436 .into_iter()
437 .map(|path| local.add_path_prefix_to_scan(path).into_future())
438 .collect::<Vec<_>>();
439 Some(barrier)
440 });
441 if let Some(barriers) = barrier {
442 futures::future::join_all(barriers).await;
443 }
444 snapshot = worktree.read_with(cx, |this, _| this.snapshot());
445 }
446 let tx = tx.clone();
447 let results = results.clone();
448
449 cx.background_executor()
450 .spawn(async move {
451 for entry in snapshot.files(include_ignored, 0) {
452 let (should_scan_tx, should_scan_rx) = oneshot::channel();
453
454 let Ok(_) = tx
455 .send(InputPath {
456 entry: entry.clone(),
457 snapshot: snapshot.clone(),
458 should_scan_tx,
459 })
460 .await
461 else {
462 return;
463 };
464 if results.send(should_scan_rx).await.is_err() {
465 return;
466 };
467 }
468 })
469 .await;
470 }
471 anyhow::Ok(())
472 })
473 .await;
474 }
475 }
476
477 async fn maintain_sorted_search_results(
478 rx: Receiver<oneshot::Receiver<ProjectPath>>,
479 paths_for_full_scan: Sender<ProjectPath>,
480 limit: usize,
481 ) {
482 let mut rx = pin!(rx);
483 let mut matched = 0;
484 while let Some(mut next_path_result) = rx.next().await {
485 let Some(successful_path) = next_path_result.next().await else {
486 // This file did not produce a match, hence skip it.
487 continue;
488 };
489 if paths_for_full_scan.send(successful_path).await.is_err() {
490 return;
491 };
492 matched += 1;
493 if matched >= limit {
494 break;
495 }
496 }
497 }
498
499 /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
500 async fn open_buffers(
501 buffer_store: Entity<BufferStore>,
502 rx: Receiver<ProjectPath>,
503 find_all_matches_tx: Sender<Entity<Buffer>>,
504 mut cx: AsyncApp,
505 ) {
506 let mut rx = pin!(rx.ready_chunks(64));
507 _ = maybe!(async move {
508 while let Some(requested_paths) = rx.next().await {
509 let mut buffers = buffer_store.update(&mut cx, |this, cx| {
510 requested_paths
511 .into_iter()
512 .map(|path| this.open_buffer(path, cx))
513 .collect::<FuturesOrdered<_>>()
514 });
515
516 while let Some(buffer) = buffers.next().await {
517 if let Some(buffer) = buffer.log_err() {
518 find_all_matches_tx.send(buffer).await?;
519 }
520 }
521 }
522 Result::<_, anyhow::Error>::Ok(())
523 })
524 .await;
525 }
526
527 async fn grab_buffer_snapshots(
528 rx: Receiver<Entity<Buffer>>,
529 find_all_matches_tx: Sender<(
530 Entity<Buffer>,
531 BufferSnapshot,
532 oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
533 )>,
534 results: Sender<oneshot::Receiver<(Entity<Buffer>, Vec<Range<language::Anchor>>)>>,
535 mut cx: AsyncApp,
536 ) {
537 _ = maybe!(async move {
538 while let Ok(buffer) = rx.recv().await {
539 let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot());
540 let (tx, rx) = oneshot::channel();
541 find_all_matches_tx.send((buffer, snapshot, tx)).await?;
542 results.send(rx).await?;
543 }
544 debug_assert!(rx.is_empty());
545 Result::<_, anyhow::Error>::Ok(())
546 })
547 .await;
548 }
549
550 async fn ensure_matched_ranges_are_reported_in_order(
551 rx: Receiver<oneshot::Receiver<(Entity<Buffer>, Vec<Range<language::Anchor>>)>>,
552 tx: Sender<SearchResult>,
553 ) {
554 use postage::stream::Stream;
555 _ = maybe!(async move {
556 let mut matched_buffers = 0;
557 let mut matches = 0;
558 while let Ok(mut next_buffer_matches) = rx.recv().await {
559 let Some((buffer, ranges)) = next_buffer_matches.recv().await else {
560 continue;
561 };
562
563 if matched_buffers > Search::MAX_SEARCH_RESULT_FILES
564 || matches > Search::MAX_SEARCH_RESULT_RANGES
565 {
566 _ = tx.send(SearchResult::LimitReached).await;
567 break;
568 }
569 matched_buffers += 1;
570 matches += ranges.len();
571
572 _ = tx.send(SearchResult::Buffer { buffer, ranges }).await?;
573 }
574 anyhow::Ok(())
575 })
576 .await;
577 }
578
579 fn all_loaded_buffers(&self, search_query: &SearchQuery, cx: &App) -> Vec<Entity<Buffer>> {
580 let worktree_store = self.worktree_store.read(cx);
581 let mut buffers = search_query
582 .buffers()
583 .into_iter()
584 .flatten()
585 .filter(|buffer| {
586 let b = buffer.read(cx);
587 if let Some(file) = b.file() {
588 if !search_query.match_path(file.path()) {
589 return false;
590 }
591 if !search_query.include_ignored()
592 && let Some(entry) = b
593 .entry_id(cx)
594 .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx))
595 && entry.is_ignored
596 {
597 return false;
598 }
599 }
600 true
601 })
602 .cloned()
603 .collect::<Vec<_>>();
604 buffers.sort_by(|a, b| {
605 let a = a.read(cx);
606 let b = b.read(cx);
607 match (a.file(), b.file()) {
608 (None, None) => a.remote_id().cmp(&b.remote_id()),
609 (None, Some(_)) => std::cmp::Ordering::Less,
610 (Some(_), None) => std::cmp::Ordering::Greater,
611 (Some(a), Some(b)) => compare_rel_paths((a.path(), true), (b.path(), true)),
612 }
613 });
614
615 buffers
616 }
617}
618
619struct Worker {
620 query: Arc<SearchQuery>,
621 open_buffers: Arc<HashSet<ProjectEntryId>>,
622 candidates: FindSearchCandidates,
623 /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
624 /// Then, when you're done, share them via the channel you were given.
625 find_all_matches_rx: Receiver<(
626 Entity<Buffer>,
627 BufferSnapshot,
628 oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
629 )>,
630}
631
632impl Worker {
633 async fn run(self) {
634 let (
635 input_paths_rx,
636 confirm_contents_will_match_rx,
637 mut confirm_contents_will_match_tx,
638 fs,
639 ) = match self.candidates {
640 FindSearchCandidates::Local {
641 fs,
642 input_paths_rx,
643 confirm_contents_will_match_rx,
644 confirm_contents_will_match_tx,
645 } => (
646 input_paths_rx,
647 confirm_contents_will_match_rx,
648 confirm_contents_will_match_tx,
649 Some(fs),
650 ),
651 FindSearchCandidates::Remote | FindSearchCandidates::OpenBuffersOnly => {
652 (unbounded().1, unbounded().1, unbounded().0, None)
653 }
654 };
655 // WorkerA: grabs a request for "find all matches in file/a" <- takes 5 minutes
656 // right after: WorkerB: grabs a request for "find all matches in file/b" <- takes 5 seconds
657 let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
658 let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse());
659 let mut scan_path = pin!(input_paths_rx.fuse());
660
661 loop {
662 let handler = RequestHandler {
663 query: &self.query,
664 open_entries: &self.open_buffers,
665 fs: fs.as_deref(),
666 confirm_contents_will_match_tx: &confirm_contents_will_match_tx,
667 };
668 // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
669 // steps straight away. Another worker might be about to produce a value that will
670 // be pushed there, thus we'll replace current worker's pipe with a dummy one.
671 // That way, we'll only ever close a next-stage channel when ALL workers do so.
672 select_biased! {
673 find_all_matches = find_all_matches.next() => {
674 let Some(matches) = find_all_matches else {
675 continue;
676 };
677 handler.handle_find_all_matches(matches).await;
678 },
679 find_first_match = find_first_match.next() => {
680 if let Some(buffer_with_at_least_one_match) = find_first_match {
681 handler.handle_find_first_match(buffer_with_at_least_one_match).await;
682 }
683 },
684 scan_path = scan_path.next() => {
685 if let Some(path_to_scan) = scan_path {
686 handler.handle_scan_path(path_to_scan).await;
687 } else {
688 // If we're the last worker to notice that this is not producing values, close the upstream.
689 confirm_contents_will_match_tx = bounded(1).0;
690 }
691
692 }
693 complete => {
694 break
695 },
696
697 }
698 }
699 }
700}
701
702struct RequestHandler<'worker> {
703 query: &'worker SearchQuery,
704 fs: Option<&'worker dyn Fs>,
705 open_entries: &'worker HashSet<ProjectEntryId>,
706 confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
707}
708
709impl RequestHandler<'_> {
710 async fn handle_find_all_matches(
711 &self,
712 (buffer, snapshot, mut report_matches): (
713 Entity<Buffer>,
714 BufferSnapshot,
715 oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
716 ),
717 ) {
718 let ranges = self
719 .query
720 .search(&snapshot, None)
721 .await
722 .iter()
723 .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
724 .collect::<Vec<_>>();
725
726 _ = report_matches.send((buffer, ranges)).await;
727 }
728
729 async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
730 _=maybe!(async move {
731 let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
732 let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else {
733 return anyhow::Ok(());
734 };
735
736 let mut file = BufReader::new(file);
737 let file_start = file.fill_buf()?;
738
739 if let Err(Some(starting_position)) =
740 std::str::from_utf8(file_start).map_err(|e| e.error_len())
741 {
742 // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
743 // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
744 log::debug!(
745 "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
746 );
747 return Ok(());
748 }
749
750 if self.query.detect(file).await.unwrap_or(false) {
751 // Yes, we should scan the whole file.
752 entry.should_scan_tx.send(entry.path).await?;
753 }
754 Ok(())
755 }).await;
756 }
757
758 async fn handle_scan_path(&self, req: InputPath) {
759 _ = maybe!(async move {
760 let InputPath {
761 entry,
762 snapshot,
763 mut should_scan_tx,
764 } = req;
765
766 if entry.is_fifo || !entry.is_file() {
767 return Ok(());
768 }
769
770 if self.query.filters_path() {
771 let matched_path = if self.query.match_full_paths() {
772 let mut full_path = snapshot.root_name().to_owned();
773 full_path.push(&entry.path);
774 self.query.match_path(&full_path)
775 } else {
776 self.query.match_path(&entry.path)
777 };
778 if !matched_path {
779 return Ok(());
780 }
781 }
782
783 if self.open_entries.contains(&entry.id) {
784 // The buffer is already in memory and that's the version we want to scan;
785 // hence skip the dilly-dally and look for all matches straight away.
786 should_scan_tx
787 .send(ProjectPath {
788 worktree_id: snapshot.id(),
789 path: entry.path.clone(),
790 })
791 .await?;
792 } else {
793 self.confirm_contents_will_match_tx
794 .send(MatchingEntry {
795 should_scan_tx: should_scan_tx,
796 worktree_root: snapshot.abs_path().clone(),
797 path: ProjectPath {
798 worktree_id: snapshot.id(),
799 path: entry.path.clone(),
800 },
801 })
802 .await?;
803 }
804
805 anyhow::Ok(())
806 })
807 .await;
808 }
809}
810
811struct InputPath {
812 entry: Entry,
813 snapshot: Snapshot,
814 should_scan_tx: oneshot::Sender<ProjectPath>,
815}
816
817struct MatchingEntry {
818 worktree_root: Arc<Path>,
819 path: ProjectPath,
820 should_scan_tx: oneshot::Sender<ProjectPath>,
821}
822
823/// This struct encapsulates the logic to decide whether a given gitignored directory should be
824/// scanned based on include/exclude patterns of a search query (as include/exclude parameters may match paths inside it).
825/// It is kind-of doing an inverse of glob. Given a glob pattern like `src/**/` and a parent path like `src`, we need to decide whether the parent
826/// may contain glob hits.
827struct PathInclusionMatcher {
828 included: BTreeSet<PathBuf>,
829 query: Arc<SearchQuery>,
830}
831
832impl PathInclusionMatcher {
833 fn new(query: Arc<SearchQuery>) -> Self {
834 let mut included = BTreeSet::new();
835 // To do an inverse glob match, we split each glob into it's prefix and the glob part.
836 // For example, `src/**/*.rs` becomes `src/` and `**/*.rs`. The glob part gets dropped.
837 // Then, when checking whether a given directory should be scanned, we check whether it is a non-empty substring of any glob prefix.
838 if query.filters_path() {
839 included.extend(
840 query
841 .files_to_include()
842 .sources()
843 .flat_map(|glob| Some(wax::Glob::new(glob).ok()?.partition().0)),
844 );
845 }
846 Self { included, query }
847 }
848
849 fn should_scan_gitignored_dir(
850 &self,
851 entry: &Entry,
852 snapshot: &Snapshot,
853 worktree_settings: &WorktreeSettings,
854 ) -> bool {
855 if !entry.is_ignored || !entry.kind.is_unloaded() {
856 return false;
857 }
858 if !self.query.include_ignored() {
859 return false;
860 }
861 if worktree_settings.is_path_excluded(&entry.path) {
862 return false;
863 }
864 if !self.query.filters_path() {
865 return true;
866 }
867
868 let as_abs_path = LazyCell::new(move || snapshot.absolutize(&entry.path));
869 let entry_path = &entry.path;
870 // 3. Check Exclusions (Pruning)
871 // If the current path is a child of an excluded path, we stop.
872 let is_excluded = self.path_is_definitely_excluded(&entry_path, snapshot);
873
874 if is_excluded {
875 return false;
876 }
877
878 // 4. Check Inclusions (Traversal)
879 if self.included.is_empty() {
880 return true;
881 }
882
883 // We scan if the current path is a descendant of an include prefix
884 // OR if the current path is an ancestor of an include prefix (we need to go deeper to find it).
885 let is_included = self.included.iter().any(|prefix| {
886 let (prefix_matches_entry, entry_matches_prefix) = if prefix.is_absolute() {
887 (
888 prefix.starts_with(&**as_abs_path),
889 as_abs_path.starts_with(prefix),
890 )
891 } else {
892 RelPath::new(prefix, snapshot.path_style()).map_or((false, false), |prefix| {
893 (
894 prefix.starts_with(entry_path),
895 entry_path.starts_with(&prefix),
896 )
897 })
898 };
899
900 // Logic:
901 // 1. entry_matches_prefix: We are inside the target zone (e.g. glob: src/, current: src/lib/). Keep scanning.
902 // 2. prefix_matches_entry: We are above the target zone (e.g. glob: src/foo/, current: src/). Keep scanning to reach foo.
903 prefix_matches_entry || entry_matches_prefix
904 });
905
906 is_included
907 }
908 fn path_is_definitely_excluded(&self, path: &RelPath, snapshot: &Snapshot) -> bool {
909 if !self.query.files_to_exclude().sources().next().is_none() {
910 let mut path = if self.query.match_full_paths() {
911 let mut full_path = snapshot.root_name().to_owned();
912 full_path.push(path);
913 full_path
914 } else {
915 path.to_owned()
916 };
917 loop {
918 if self.query.files_to_exclude().is_match(&path) {
919 return true;
920 } else if !path.pop() {
921 return false;
922 }
923 }
924 } else {
925 false
926 }
927 }
928}
929
930type IsTerminating = bool;
931/// Adaptive batcher that starts eager (small batches) and grows batch size
932/// when items arrive quickly, reducing RPC overhead while preserving low latency
933/// for slow streams.
934pub struct AdaptiveBatcher<T> {
935 items: Sender<T>,
936 flush_batch: Sender<IsTerminating>,
937 _batch_task: Task<()>,
938}
939
940impl<T: 'static + Send> AdaptiveBatcher<T> {
941 pub fn new(cx: &BackgroundExecutor) -> (Self, Receiver<Vec<T>>) {
942 let (items, rx) = unbounded();
943 let (batch_tx, batch_rx) = unbounded();
944 let (flush_batch_tx, flush_batch_rx) = unbounded();
945 let flush_batch = flush_batch_tx.clone();
946 let executor = cx.clone();
947 let _batch_task = cx.spawn_with_priority(gpui::Priority::High, async move {
948 let mut current_batch = vec![];
949 let mut items_produced_so_far = 0_u64;
950
951 let mut _schedule_flush_after_delay: Option<Task<()>> = None;
952 let _time_elapsed_since_start_of_search = std::time::Instant::now();
953 let mut flush = pin!(flush_batch_rx);
954 let mut terminating = false;
955 loop {
956 select_biased! {
957 item = rx.recv().fuse() => {
958 match item {
959 Ok(new_item) => {
960 let is_fresh_batch = current_batch.is_empty();
961 items_produced_so_far += 1;
962 current_batch.push(new_item);
963 if is_fresh_batch {
964 // Chosen arbitrarily based on some experimentation with plots.
965 let desired_duration_ms = (20 * (items_produced_so_far + 2).ilog2() as u64).min(300);
966 let desired_duration = Duration::from_millis(desired_duration_ms);
967 let _executor = executor.clone();
968 let _flush = flush_batch_tx.clone();
969 let new_timer = executor.spawn_with_priority(Priority::High, async move {
970 _executor.timer(desired_duration).await;
971 _ = _flush.send(false).await;
972 });
973 _schedule_flush_after_delay = Some(new_timer);
974 }
975 }
976 Err(_) => {
977 // Items channel closed - send any remaining batch before exiting
978 if !current_batch.is_empty() {
979 _ = batch_tx.send(std::mem::take(&mut current_batch)).await;
980 }
981 break;
982 }
983 }
984 }
985 should_break_afterwards = flush.next() => {
986 if !current_batch.is_empty() {
987 _ = batch_tx.send(std::mem::take(&mut current_batch)).await;
988 _schedule_flush_after_delay = None;
989 }
990 if should_break_afterwards.unwrap_or_default() {
991 terminating = true;
992 }
993 }
994 complete => {
995 break;
996 }
997 }
998 if terminating {
999 // Drain any remaining items before exiting
1000 while let Ok(new_item) = rx.try_recv() {
1001 current_batch.push(new_item);
1002 }
1003 if !current_batch.is_empty() {
1004 _ = batch_tx.send(std::mem::take(&mut current_batch)).await;
1005 }
1006 break;
1007 }
1008 }
1009 });
1010 let this = Self {
1011 items,
1012 _batch_task,
1013 flush_batch,
1014 };
1015 (this, batch_rx)
1016 }
1017
1018 pub async fn push(&self, item: T) {
1019 _ = self.items.send(item).await;
1020 }
1021
1022 pub async fn flush(self) {
1023 _ = self.flush_batch.send(true).await;
1024 self._batch_task.await;
1025 }
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030 use super::*;
1031 use fs::FakeFs;
1032 use serde_json::json;
1033 use settings::Settings;
1034 use util::{
1035 path,
1036 paths::{PathMatcher, PathStyle},
1037 rel_path::RelPath,
1038 };
1039 use worktree::{Entry, EntryKind, WorktreeSettings};
1040
1041 use crate::{
1042 Project, project_search::PathInclusionMatcher, project_tests::init_test,
1043 search::SearchQuery,
1044 };
1045
1046 #[gpui::test]
1047 async fn test_path_inclusion_matcher(cx: &mut gpui::TestAppContext) {
1048 init_test(cx);
1049
1050 let fs = FakeFs::new(cx.background_executor.clone());
1051 fs.insert_tree(
1052 "/root",
1053 json!({
1054 ".gitignore": "src/data/\n",
1055 "src": {
1056 "data": {
1057 "main.csv": "field_1,field_2,field_3",
1058 },
1059 "lib": {
1060 "main.txt": "Are you familiar with fields?",
1061 },
1062 },
1063 }),
1064 )
1065 .await;
1066
1067 let project = Project::test(fs.clone(), [path!("/root").as_ref()], cx).await;
1068 let worktree = project.update(cx, |project, cx| project.worktrees(cx).next().unwrap());
1069 let (worktree_settings, worktree_snapshot) = worktree.update(cx, |worktree, cx| {
1070 let settings_location = worktree.settings_location(cx);
1071 return (
1072 WorktreeSettings::get(Some(settings_location), cx).clone(),
1073 worktree.snapshot(),
1074 );
1075 });
1076
1077 // Manually create a test entry for the gitignored directory since it won't
1078 // be loaded by the worktree
1079 let entry = Entry {
1080 id: ProjectEntryId::from_proto(1),
1081 kind: EntryKind::UnloadedDir,
1082 path: Arc::from(RelPath::unix(Path::new("src/data")).unwrap()),
1083 inode: 0,
1084 mtime: None,
1085 canonical_path: None,
1086 is_ignored: true,
1087 is_hidden: false,
1088 is_always_included: false,
1089 is_external: false,
1090 is_private: false,
1091 size: 0,
1092 char_bag: Default::default(),
1093 is_fifo: false,
1094 };
1095
1096 // 1. Test searching for `field`, including ignored files without any
1097 // inclusion and exclusion filters.
1098 let include_ignored = true;
1099 let files_to_include = PathMatcher::default();
1100 let files_to_exclude = PathMatcher::default();
1101 let match_full_paths = false;
1102 let search_query = SearchQuery::text(
1103 "field",
1104 false,
1105 false,
1106 include_ignored,
1107 files_to_include,
1108 files_to_exclude,
1109 match_full_paths,
1110 None,
1111 )
1112 .unwrap();
1113
1114 let path_matcher = PathInclusionMatcher::new(Arc::new(search_query));
1115 assert!(path_matcher.should_scan_gitignored_dir(
1116 &entry,
1117 &worktree_snapshot,
1118 &worktree_settings
1119 ));
1120
1121 // 2. Test searching for `field`, including ignored files but updating
1122 // `files_to_include` to only include files under `src/lib`.
1123 let include_ignored = true;
1124 let files_to_include = PathMatcher::new(vec!["src/lib"], PathStyle::Posix).unwrap();
1125 let files_to_exclude = PathMatcher::default();
1126 let match_full_paths = false;
1127 let search_query = SearchQuery::text(
1128 "field",
1129 false,
1130 false,
1131 include_ignored,
1132 files_to_include,
1133 files_to_exclude,
1134 match_full_paths,
1135 None,
1136 )
1137 .unwrap();
1138
1139 let path_matcher = PathInclusionMatcher::new(Arc::new(search_query));
1140 assert!(!path_matcher.should_scan_gitignored_dir(
1141 &entry,
1142 &worktree_snapshot,
1143 &worktree_settings
1144 ));
1145 }
1146}