1use std::{
2 cell::LazyCell,
3 collections::BTreeSet,
4 io::{BufRead, BufReader},
5 ops::Range,
6 path::{Path, PathBuf},
7 pin::pin,
8 sync::Arc,
9 time::Duration,
10};
11
12use anyhow::Context;
13use collections::HashSet;
14use fs::Fs;
15use futures::FutureExt as _;
16use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
17use gpui::{App, AppContext, AsyncApp, BackgroundExecutor, Entity, Priority, Task};
18use language::{Buffer, BufferSnapshot};
19use parking_lot::Mutex;
20use postage::oneshot;
21use rpc::{AnyProtoClient, proto};
22use smol::channel::{Receiver, Sender, bounded, unbounded};
23
24use util::{ResultExt, maybe, paths::compare_rel_paths, rel_path::RelPath};
25use worktree::{Entry, ProjectEntryId, Snapshot, Worktree, WorktreeSettings};
26
27use crate::{
28 Project, ProjectItem, ProjectPath, RemotelyCreatedModels,
29 buffer_store::BufferStore,
30 search::{SearchQuery, SearchResult},
31 worktree_store::WorktreeStore,
32};
33
34pub struct Search {
35 buffer_store: Entity<BufferStore>,
36 worktree_store: Entity<WorktreeStore>,
37 limit: usize,
38 kind: SearchKind,
39}
40
41/// Represents search setup, before it is actually kicked off with Search::into_results
42enum SearchKind {
43 /// Search for candidates by inspecting file contents on file system, avoiding loading the buffer unless we know that a given file contains a match.
44 Local {
45 fs: Arc<dyn Fs>,
46 worktrees: Vec<Entity<Worktree>>,
47 },
48 /// Query remote host for candidates. As of writing, the host runs a local search in "buffers with matches only" mode.
49 Remote {
50 client: AnyProtoClient,
51 remote_id: u64,
52 models: Arc<Mutex<RemotelyCreatedModels>>,
53 },
54 /// Run search against a known set of candidates. Even when working with a remote host, this won't round-trip to host.
55 OpenBuffersOnly,
56}
57
58/// Represents results of project search and allows one to either obtain match positions OR
59/// just the handles to buffers that may match the search. Grabbing the handles is cheaper than obtaining full match positions, because in that case we'll look for
60/// at most one match in each file.
61#[must_use]
62pub struct SearchResultsHandle {
63 results: Receiver<SearchResult>,
64 matching_buffers: Receiver<Entity<Buffer>>,
65 trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
66}
67
68pub struct SearchResults<T> {
69 pub _task_handle: Task<()>,
70 pub rx: Receiver<T>,
71}
72impl SearchResultsHandle {
73 pub fn results(self, cx: &mut App) -> SearchResults<SearchResult> {
74 SearchResults {
75 _task_handle: (self.trigger_search)(cx),
76 rx: self.results,
77 }
78 }
79 pub fn matching_buffers(self, cx: &mut App) -> SearchResults<Entity<Buffer>> {
80 SearchResults {
81 _task_handle: (self.trigger_search)(cx),
82 rx: self.matching_buffers,
83 }
84 }
85}
86
87#[derive(Clone)]
88enum FindSearchCandidates {
89 Local {
90 fs: Arc<dyn Fs>,
91 /// Start off with all paths in project and filter them based on:
92 /// - Include filters
93 /// - Exclude filters
94 /// - Only open buffers
95 /// - Scan ignored files
96 /// Put another way: filter out files that can't match (without looking at file contents)
97 input_paths_rx: Receiver<InputPath>,
98 /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
99 /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
100 confirm_contents_will_match_tx: Sender<MatchingEntry>,
101 confirm_contents_will_match_rx: Receiver<MatchingEntry>,
102 },
103 Remote,
104 OpenBuffersOnly,
105}
106
107impl Search {
108 pub fn local(
109 fs: Arc<dyn Fs>,
110 buffer_store: Entity<BufferStore>,
111 worktree_store: Entity<WorktreeStore>,
112 limit: usize,
113 cx: &mut App,
114 ) -> Self {
115 let worktrees = worktree_store.read(cx).visible_worktrees(cx).collect();
116 Self {
117 kind: SearchKind::Local { fs, worktrees },
118 buffer_store,
119 worktree_store,
120 limit,
121 }
122 }
123
124 pub(crate) fn remote(
125 buffer_store: Entity<BufferStore>,
126 worktree_store: Entity<WorktreeStore>,
127 limit: usize,
128 client_state: (AnyProtoClient, u64, Arc<Mutex<RemotelyCreatedModels>>),
129 ) -> Self {
130 Self {
131 kind: SearchKind::Remote {
132 client: client_state.0,
133 remote_id: client_state.1,
134 models: client_state.2,
135 },
136 buffer_store,
137 worktree_store,
138 limit,
139 }
140 }
141 pub(crate) fn open_buffers_only(
142 buffer_store: Entity<BufferStore>,
143 worktree_store: Entity<WorktreeStore>,
144 limit: usize,
145 ) -> Self {
146 Self {
147 kind: SearchKind::OpenBuffersOnly,
148 buffer_store,
149 worktree_store,
150 limit,
151 }
152 }
153
154 pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000;
155 pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
156 /// Prepares a project search run. The resulting [`SearchResultsHandle`] has to be used to specify whether you're interested in matching buffers
157 /// or full search results.
158 pub fn into_handle(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
159 let mut open_buffers = HashSet::default();
160 let mut unnamed_buffers = Vec::new();
161 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
162 let buffers = self.buffer_store.read(cx);
163 for handle in buffers.buffers() {
164 let buffer = handle.read(cx);
165 if !buffers.is_searchable(&buffer.remote_id()) {
166 continue;
167 } else if buffer
168 .file()
169 .is_some_and(|file| file.disk_state().is_deleted())
170 {
171 continue;
172 } else if let Some(entry_id) = buffer.entry_id(cx) {
173 open_buffers.insert(entry_id);
174 } else {
175 self.limit = self.limit.saturating_sub(1);
176 unnamed_buffers.push(handle)
177 };
178 }
179 let open_buffers = Arc::new(open_buffers);
180 let executor = cx.background_executor().clone();
181 let (tx, rx) = unbounded();
182 let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
183 let matching_buffers = grab_buffer_snapshot_rx.clone();
184 let trigger_search = Box::new(move |cx: &mut App| {
185 cx.spawn(async move |cx| {
186 for buffer in unnamed_buffers {
187 _ = grab_buffer_snapshot_tx.send(buffer).await;
188 }
189
190 let (find_all_matches_tx, find_all_matches_rx) =
191 bounded(MAX_CONCURRENT_BUFFER_OPENS);
192 let query = Arc::new(query);
193 let (candidate_searcher, tasks) = match self.kind {
194 SearchKind::OpenBuffersOnly => {
195 let open_buffers = cx.update(|cx| self.all_loaded_buffers(&query, cx));
196 let fill_requests = cx
197 .background_spawn(async move {
198 for buffer in open_buffers {
199 if let Err(_) = grab_buffer_snapshot_tx.send(buffer).await {
200 return;
201 }
202 }
203 })
204 .boxed_local();
205 (FindSearchCandidates::OpenBuffersOnly, vec![fill_requests])
206 }
207 SearchKind::Local {
208 fs,
209 ref mut worktrees,
210 } => {
211 let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
212 unbounded();
213 let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
214 bounded(64);
215 let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
216
217 let (input_paths_tx, input_paths_rx) = unbounded();
218 let tasks = vec![
219 cx.spawn(Self::provide_search_paths(
220 std::mem::take(worktrees),
221 query.clone(),
222 input_paths_tx,
223 sorted_search_results_tx,
224 tx.clone(),
225 ))
226 .boxed_local(),
227 Self::open_buffers(
228 self.buffer_store,
229 get_buffer_for_full_scan_rx,
230 grab_buffer_snapshot_tx,
231 cx.clone(),
232 )
233 .boxed_local(),
234 cx.background_spawn(Self::maintain_sorted_search_results(
235 sorted_search_results_rx,
236 get_buffer_for_full_scan_tx,
237 self.limit,
238 ))
239 .boxed_local(),
240 ];
241 (
242 FindSearchCandidates::Local {
243 fs,
244 confirm_contents_will_match_tx,
245 confirm_contents_will_match_rx,
246 input_paths_rx,
247 },
248 tasks,
249 )
250 }
251 SearchKind::Remote {
252 client,
253 remote_id,
254 models,
255 } => {
256 let (handle, rx) = self
257 .buffer_store
258 .update(cx, |this, _| this.register_project_search_result_handle());
259
260 let cancel_ongoing_search = util::defer({
261 let client = client.clone();
262 move || {
263 _ = client.send(proto::FindSearchCandidatesCancelled {
264 project_id: remote_id,
265 handle,
266 });
267 }
268 });
269 let request = client.request(proto::FindSearchCandidates {
270 project_id: remote_id,
271 query: Some(query.to_proto()),
272 limit: self.limit as _,
273 handle,
274 });
275
276 let buffer_store = self.buffer_store;
277 let guard = cx.update(|cx| {
278 Project::retain_remotely_created_models_impl(
279 &models,
280 &buffer_store,
281 &self.worktree_store,
282 cx,
283 )
284 });
285
286 let issue_remote_buffers_request = cx
287 .spawn(async move |cx| {
288 let _ = maybe!(async move {
289 request.await?;
290
291 let (buffer_tx, buffer_rx) = bounded(24);
292
293 let wait_for_remote_buffers = cx.spawn(async move |cx| {
294 while let Ok(buffer_id) = rx.recv().await {
295 let buffer =
296 buffer_store.update(cx, |buffer_store, cx| {
297 buffer_store
298 .wait_for_remote_buffer(buffer_id, cx)
299 });
300 buffer_tx.send(buffer).await?;
301 }
302 anyhow::Ok(())
303 });
304
305 let forward_buffers = cx.background_spawn(async move {
306 while let Ok(buffer) = buffer_rx.recv().await {
307 let _ =
308 grab_buffer_snapshot_tx.send(buffer.await?).await;
309 }
310 anyhow::Ok(())
311 });
312 let (left, right) = futures::future::join(
313 wait_for_remote_buffers,
314 forward_buffers,
315 )
316 .await;
317 left?;
318 right?;
319
320 drop(guard);
321 cancel_ongoing_search.abort();
322 anyhow::Ok(())
323 })
324 .await
325 .log_err();
326 })
327 .boxed_local();
328 (
329 FindSearchCandidates::Remote,
330 vec![issue_remote_buffers_request],
331 )
332 }
333 };
334
335 let should_find_all_matches = !tx.is_closed();
336
337 let _executor = executor.clone();
338 let worker_pool = executor.spawn(async move {
339 let num_cpus = _executor.num_cpus();
340
341 assert!(num_cpus > 0);
342 _executor
343 .scoped(|scope| {
344 let worker_count = (num_cpus - 1).max(1);
345 for _ in 0..worker_count {
346 let worker = Worker {
347 query: query.clone(),
348 open_buffers: open_buffers.clone(),
349 candidates: candidate_searcher.clone(),
350 find_all_matches_rx: find_all_matches_rx.clone(),
351 };
352 scope.spawn(worker.run());
353 }
354
355 drop(find_all_matches_rx);
356 drop(candidate_searcher);
357 })
358 .await;
359 });
360
361 let (sorted_matches_tx, sorted_matches_rx) = unbounded();
362 // The caller of `into_handle` decides whether they're interested in all matches (files that matched + all matching ranges) or
363 // just the files. *They are using the same stream as the guts of the project search do*.
364 // This means that we cannot grab values off of that stream unless it's strictly needed for making a progress in project search.
365 //
366 // Grabbing buffer snapshots is only necessary when we're looking for all matches. If the caller decided that they're not interested
367 // in all matches, running that task unconditionally would hinder caller's ability to observe all matching file paths.
368 let buffer_snapshots = if should_find_all_matches {
369 Some(
370 Self::grab_buffer_snapshots(
371 grab_buffer_snapshot_rx,
372 find_all_matches_tx,
373 sorted_matches_tx,
374 cx.clone(),
375 )
376 .boxed_local(),
377 )
378 } else {
379 drop(find_all_matches_tx);
380
381 None
382 };
383 let ensure_matches_are_reported_in_order = if should_find_all_matches {
384 Some(
385 Self::ensure_matched_ranges_are_reported_in_order(sorted_matches_rx, tx)
386 .boxed_local(),
387 )
388 } else {
389 drop(tx);
390 None
391 };
392
393 futures::future::join_all(
394 [worker_pool.boxed_local()]
395 .into_iter()
396 .chain(buffer_snapshots)
397 .chain(ensure_matches_are_reported_in_order)
398 .chain(tasks),
399 )
400 .await;
401 })
402 });
403
404 SearchResultsHandle {
405 results: rx,
406 matching_buffers,
407 trigger_search,
408 }
409 }
410
411 fn provide_search_paths(
412 worktrees: Vec<Entity<Worktree>>,
413 query: Arc<SearchQuery>,
414 tx: Sender<InputPath>,
415 results: Sender<oneshot::Receiver<ProjectPath>>,
416 results_tx: Sender<SearchResult>,
417 ) -> impl AsyncFnOnce(&mut AsyncApp) {
418 async move |cx| {
419 _ = maybe!(async move {
420 let gitignored_tracker = PathInclusionMatcher::new(query.clone());
421 let include_ignored = query.include_ignored();
422 for worktree in worktrees {
423 let scan_complete = worktree.read_with(cx, |worktree, _| {
424 worktree.as_local().map(|local| local.scan_complete())
425 });
426 if let Some(scan_complete) = scan_complete {
427 _ = results_tx.send(SearchResult::WaitingForScan).await;
428 scan_complete.await;
429 }
430
431 let (mut snapshot, worktree_settings) = worktree
432 .read_with(cx, |this, _| {
433 Some((this.snapshot(), this.as_local()?.settings()))
434 })
435 .context("The worktree is not local")?;
436 if query.include_ignored() {
437 // Pre-fetch all of the ignored directories as they're going to be searched.
438 let mut entries_to_refresh = vec![];
439
440 for entry in snapshot.entries(query.include_ignored(), 0) {
441 if gitignored_tracker.should_scan_gitignored_dir(
442 entry,
443 &snapshot,
444 &worktree_settings,
445 ) {
446 entries_to_refresh.push(entry.path.clone());
447 }
448 }
449 let barrier = worktree.update(cx, |this, _| {
450 let local = this.as_local_mut()?;
451 let barrier = entries_to_refresh
452 .into_iter()
453 .map(|path| local.add_path_prefix_to_scan(path).into_future())
454 .collect::<Vec<_>>();
455 Some(barrier)
456 });
457 if let Some(barriers) = barrier {
458 futures::future::join_all(barriers).await;
459 }
460 snapshot = worktree.read_with(cx, |this, _| this.snapshot());
461 }
462 let tx = tx.clone();
463 let results = results.clone();
464
465 cx.background_executor()
466 .spawn(async move {
467 for entry in snapshot.files(include_ignored, 0) {
468 let (should_scan_tx, should_scan_rx) = oneshot::channel();
469
470 let Ok(_) = tx
471 .send(InputPath {
472 entry: entry.clone(),
473 snapshot: snapshot.clone(),
474 should_scan_tx,
475 })
476 .await
477 else {
478 return;
479 };
480 if results.send(should_scan_rx).await.is_err() {
481 return;
482 };
483 }
484 })
485 .await;
486 }
487 anyhow::Ok(())
488 })
489 .await;
490 }
491 }
492
493 async fn maintain_sorted_search_results(
494 rx: Receiver<oneshot::Receiver<ProjectPath>>,
495 paths_for_full_scan: Sender<ProjectPath>,
496 limit: usize,
497 ) {
498 let mut rx = pin!(rx);
499 let mut matched = 0;
500 while let Some(mut next_path_result) = rx.next().await {
501 let Some(successful_path) = next_path_result.next().await else {
502 // This file did not produce a match, hence skip it.
503 continue;
504 };
505 if paths_for_full_scan.send(successful_path).await.is_err() {
506 return;
507 };
508 matched += 1;
509 if matched >= limit {
510 break;
511 }
512 }
513 }
514
515 /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
516 async fn open_buffers(
517 buffer_store: Entity<BufferStore>,
518 rx: Receiver<ProjectPath>,
519 find_all_matches_tx: Sender<Entity<Buffer>>,
520 mut cx: AsyncApp,
521 ) {
522 let mut rx = pin!(rx.ready_chunks(64));
523 _ = maybe!(async move {
524 while let Some(requested_paths) = rx.next().await {
525 let mut buffers = buffer_store.update(&mut cx, |this, cx| {
526 requested_paths
527 .into_iter()
528 .map(|path| this.open_buffer(path, cx))
529 .collect::<FuturesOrdered<_>>()
530 });
531
532 while let Some(buffer) = buffers.next().await {
533 if let Some(buffer) = buffer.log_err() {
534 find_all_matches_tx.send(buffer).await?;
535 }
536 }
537 }
538 Result::<_, anyhow::Error>::Ok(())
539 })
540 .await;
541 }
542
543 async fn grab_buffer_snapshots(
544 rx: Receiver<Entity<Buffer>>,
545 find_all_matches_tx: Sender<(
546 Entity<Buffer>,
547 BufferSnapshot,
548 oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
549 )>,
550 results: Sender<oneshot::Receiver<(Entity<Buffer>, Vec<Range<language::Anchor>>)>>,
551 mut cx: AsyncApp,
552 ) {
553 _ = maybe!(async move {
554 while let Ok(buffer) = rx.recv().await {
555 let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot());
556 let (tx, rx) = oneshot::channel();
557 find_all_matches_tx.send((buffer, snapshot, tx)).await?;
558 results.send(rx).await?;
559 }
560 debug_assert!(rx.is_empty());
561 Result::<_, anyhow::Error>::Ok(())
562 })
563 .await;
564 }
565
566 async fn ensure_matched_ranges_are_reported_in_order(
567 rx: Receiver<oneshot::Receiver<(Entity<Buffer>, Vec<Range<language::Anchor>>)>>,
568 tx: Sender<SearchResult>,
569 ) {
570 use postage::stream::Stream;
571 _ = maybe!(async move {
572 let mut matched_buffers = 0;
573 let mut matches = 0;
574 while let Ok(mut next_buffer_matches) = rx.recv().await {
575 let Some((buffer, ranges)) = next_buffer_matches.recv().await else {
576 continue;
577 };
578
579 if matched_buffers > Search::MAX_SEARCH_RESULT_FILES
580 || matches > Search::MAX_SEARCH_RESULT_RANGES
581 {
582 _ = tx.send(SearchResult::LimitReached).await;
583 break;
584 }
585 matched_buffers += 1;
586 matches += ranges.len();
587
588 _ = tx.send(SearchResult::Buffer { buffer, ranges }).await?;
589 }
590 anyhow::Ok(())
591 })
592 .await;
593 }
594
595 fn all_loaded_buffers(&self, search_query: &SearchQuery, cx: &App) -> Vec<Entity<Buffer>> {
596 let worktree_store = self.worktree_store.read(cx);
597 let mut buffers = search_query
598 .buffers()
599 .into_iter()
600 .flatten()
601 .filter(|buffer| {
602 let b = buffer.read(cx);
603 if let Some(file) = b.file() {
604 if file.disk_state().is_deleted() {
605 return false;
606 }
607 if !search_query.match_path(file.path()) {
608 return false;
609 }
610 if !search_query.include_ignored()
611 && let Some(entry) = b
612 .entry_id(cx)
613 .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx))
614 && entry.is_ignored
615 {
616 return false;
617 }
618 }
619 true
620 })
621 .cloned()
622 .collect::<Vec<_>>();
623 buffers.sort_by(|a, b| {
624 let a = a.read(cx);
625 let b = b.read(cx);
626 match (a.file(), b.file()) {
627 (None, None) => a.remote_id().cmp(&b.remote_id()),
628 (None, Some(_)) => std::cmp::Ordering::Less,
629 (Some(_), None) => std::cmp::Ordering::Greater,
630 (Some(a), Some(b)) => compare_rel_paths((a.path(), true), (b.path(), true)),
631 }
632 });
633
634 buffers
635 }
636}
637
638struct Worker {
639 query: Arc<SearchQuery>,
640 open_buffers: Arc<HashSet<ProjectEntryId>>,
641 candidates: FindSearchCandidates,
642 /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
643 /// Then, when you're done, share them via the channel you were given.
644 find_all_matches_rx: Receiver<(
645 Entity<Buffer>,
646 BufferSnapshot,
647 oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
648 )>,
649}
650
651impl Worker {
652 async fn run(self) {
653 let (
654 input_paths_rx,
655 confirm_contents_will_match_rx,
656 mut confirm_contents_will_match_tx,
657 fs,
658 ) = match self.candidates {
659 FindSearchCandidates::Local {
660 fs,
661 input_paths_rx,
662 confirm_contents_will_match_rx,
663 confirm_contents_will_match_tx,
664 } => (
665 input_paths_rx,
666 confirm_contents_will_match_rx,
667 confirm_contents_will_match_tx,
668 Some(fs),
669 ),
670 FindSearchCandidates::Remote | FindSearchCandidates::OpenBuffersOnly => {
671 (unbounded().1, unbounded().1, unbounded().0, None)
672 }
673 };
674 // WorkerA: grabs a request for "find all matches in file/a" <- takes 5 minutes
675 // right after: WorkerB: grabs a request for "find all matches in file/b" <- takes 5 seconds
676 let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
677 let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse());
678 let mut scan_path = pin!(input_paths_rx.fuse());
679
680 loop {
681 let handler = RequestHandler {
682 query: &self.query,
683 open_entries: &self.open_buffers,
684 fs: fs.as_deref(),
685 confirm_contents_will_match_tx: &confirm_contents_will_match_tx,
686 };
687 // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
688 // steps straight away. Another worker might be about to produce a value that will
689 // be pushed there, thus we'll replace current worker's pipe with a dummy one.
690 // That way, we'll only ever close a next-stage channel when ALL workers do so.
691 select_biased! {
692 find_all_matches = find_all_matches.next() => {
693 let Some(matches) = find_all_matches else {
694 continue;
695 };
696 handler.handle_find_all_matches(matches).await;
697 },
698 find_first_match = find_first_match.next() => {
699 if let Some(buffer_with_at_least_one_match) = find_first_match {
700 handler.handle_find_first_match(buffer_with_at_least_one_match).await;
701 }
702 },
703 scan_path = scan_path.next() => {
704 if let Some(path_to_scan) = scan_path {
705 handler.handle_scan_path(path_to_scan).await;
706 } else {
707 // If we're the last worker to notice that this is not producing values, close the upstream.
708 confirm_contents_will_match_tx = bounded(1).0;
709 }
710
711 }
712 complete => {
713 break
714 },
715
716 }
717 }
718 }
719}
720
721struct RequestHandler<'worker> {
722 query: &'worker SearchQuery,
723 fs: Option<&'worker dyn Fs>,
724 open_entries: &'worker HashSet<ProjectEntryId>,
725 confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
726}
727
728impl RequestHandler<'_> {
729 async fn handle_find_all_matches(
730 &self,
731 (buffer, snapshot, mut report_matches): (
732 Entity<Buffer>,
733 BufferSnapshot,
734 oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
735 ),
736 ) {
737 let ranges = self
738 .query
739 .search(&snapshot, None)
740 .await
741 .iter()
742 .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
743 .collect::<Vec<_>>();
744
745 _ = report_matches.send((buffer, ranges)).await;
746 }
747
748 async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
749 async move {
750 let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
751 let Some(file) = self
752 .fs
753 .context("Trying to query filesystem in remote project search")?
754 .open_sync(&abs_path)
755 .await
756 .log_err()
757 else {
758 return anyhow::Ok(());
759 };
760
761 let mut file = BufReader::new(file);
762 let file_start = file.fill_buf()?;
763
764 if let Err(Some(starting_position)) =
765 std::str::from_utf8(file_start).map_err(|e| e.error_len())
766 {
767 // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
768 // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
769 log::debug!(
770 "Invalid UTF-8 sequence in file {abs_path:?} \
771 at byte position {starting_position}"
772 );
773 return Ok(());
774 }
775
776 if self.query.detect(file).await.unwrap_or(false) {
777 // Yes, we should scan the whole file.
778 entry.should_scan_tx.send(entry.path).await?;
779 }
780 Ok(())
781 }
782 .await
783 .ok();
784 }
785
786 async fn handle_scan_path(&self, req: InputPath) {
787 _ = maybe!(async move {
788 let InputPath {
789 entry,
790 snapshot,
791 mut should_scan_tx,
792 } = req;
793
794 if entry.is_fifo || !entry.is_file() {
795 return Ok(());
796 }
797
798 if self.query.filters_path() {
799 let matched_path = if self.query.match_full_paths() {
800 let mut full_path = snapshot.root_name().to_owned();
801 full_path.push(&entry.path);
802 self.query.match_path(&full_path)
803 } else {
804 self.query.match_path(&entry.path)
805 };
806 if !matched_path {
807 return Ok(());
808 }
809 }
810
811 if self.open_entries.contains(&entry.id) {
812 // The buffer is already in memory and that's the version we want to scan;
813 // hence skip the dilly-dally and look for all matches straight away.
814 should_scan_tx
815 .send(ProjectPath {
816 worktree_id: snapshot.id(),
817 path: entry.path.clone(),
818 })
819 .await?;
820 } else {
821 self.confirm_contents_will_match_tx
822 .send(MatchingEntry {
823 should_scan_tx: should_scan_tx,
824 worktree_root: snapshot.abs_path().clone(),
825 path: ProjectPath {
826 worktree_id: snapshot.id(),
827 path: entry.path.clone(),
828 },
829 })
830 .await?;
831 }
832
833 anyhow::Ok(())
834 })
835 .await;
836 }
837}
838
839struct InputPath {
840 entry: Entry,
841 snapshot: Snapshot,
842 should_scan_tx: oneshot::Sender<ProjectPath>,
843}
844
845struct MatchingEntry {
846 worktree_root: Arc<Path>,
847 path: ProjectPath,
848 should_scan_tx: oneshot::Sender<ProjectPath>,
849}
850
851/// This struct encapsulates the logic to decide whether a given gitignored directory should be
852/// scanned based on include/exclude patterns of a search query (as include/exclude parameters may match paths inside it).
853/// It is kind-of doing an inverse of glob. Given a glob pattern like `src/**/` and a parent path like `src`, we need to decide whether the parent
854/// may contain glob hits.
855pub struct PathInclusionMatcher {
856 included: BTreeSet<PathBuf>,
857 query: Arc<SearchQuery>,
858}
859
860impl PathInclusionMatcher {
861 pub fn new(query: Arc<SearchQuery>) -> Self {
862 let mut included = BTreeSet::new();
863 // To do an inverse glob match, we split each glob into it's prefix and the glob part.
864 // For example, `src/**/*.rs` becomes `src/` and `**/*.rs`. The glob part gets dropped.
865 // Then, when checking whether a given directory should be scanned, we check whether it is a non-empty substring of any glob prefix.
866 if query.filters_path() {
867 included.extend(
868 query
869 .files_to_include()
870 .sources()
871 .flat_map(|glob| Some(wax::Glob::new(glob).ok()?.partition().0)),
872 );
873 }
874 Self { included, query }
875 }
876
877 pub fn should_scan_gitignored_dir(
878 &self,
879 entry: &Entry,
880 snapshot: &Snapshot,
881 worktree_settings: &WorktreeSettings,
882 ) -> bool {
883 if !entry.is_ignored || !entry.kind.is_unloaded() {
884 return false;
885 }
886 if !self.query.include_ignored() {
887 return false;
888 }
889 if worktree_settings.is_path_excluded(&entry.path) {
890 return false;
891 }
892 if !self.query.filters_path() {
893 return true;
894 }
895
896 let as_abs_path = LazyCell::new(move || snapshot.absolutize(&entry.path));
897 let entry_path = &entry.path;
898 // 3. Check Exclusions (Pruning)
899 // If the current path is a child of an excluded path, we stop.
900 let is_excluded = self.path_is_definitely_excluded(&entry_path, snapshot);
901
902 if is_excluded {
903 return false;
904 }
905
906 // 4. Check Inclusions (Traversal)
907 if self.included.is_empty() {
908 return true;
909 }
910
911 // We scan if the current path is a descendant of an include prefix
912 // OR if the current path is an ancestor of an include prefix (we need to go deeper to find it).
913 let is_included = self.included.iter().any(|prefix| {
914 let (prefix_matches_entry, entry_matches_prefix) = if prefix.is_absolute() {
915 (
916 prefix.starts_with(&**as_abs_path),
917 as_abs_path.starts_with(prefix),
918 )
919 } else {
920 RelPath::new(prefix, snapshot.path_style()).map_or((false, false), |prefix| {
921 (
922 prefix.starts_with(entry_path),
923 entry_path.starts_with(&prefix),
924 )
925 })
926 };
927
928 // Logic:
929 // 1. entry_matches_prefix: We are inside the target zone (e.g. glob: src/, current: src/lib/). Keep scanning.
930 // 2. prefix_matches_entry: We are above the target zone (e.g. glob: src/foo/, current: src/). Keep scanning to reach foo.
931 prefix_matches_entry || entry_matches_prefix
932 });
933
934 is_included
935 }
936 fn path_is_definitely_excluded(&self, path: &RelPath, snapshot: &Snapshot) -> bool {
937 if !self.query.files_to_exclude().sources().next().is_none() {
938 let mut path = if self.query.match_full_paths() {
939 let mut full_path = snapshot.root_name().to_owned();
940 full_path.push(path);
941 full_path
942 } else {
943 path.to_owned()
944 };
945 loop {
946 if self.query.files_to_exclude().is_match(&path) {
947 return true;
948 } else if !path.pop() {
949 return false;
950 }
951 }
952 } else {
953 false
954 }
955 }
956}
957
958type IsTerminating = bool;
959/// Adaptive batcher that starts eager (small batches) and grows batch size
960/// when items arrive quickly, reducing RPC overhead while preserving low latency
961/// for slow streams.
962pub struct AdaptiveBatcher<T> {
963 items: Sender<T>,
964 flush_batch: Sender<IsTerminating>,
965 _batch_task: Task<()>,
966}
967
968impl<T: 'static + Send> AdaptiveBatcher<T> {
969 pub fn new(cx: &BackgroundExecutor) -> (Self, Receiver<Vec<T>>) {
970 let (items, rx) = unbounded();
971 let (batch_tx, batch_rx) = unbounded();
972 let (flush_batch_tx, flush_batch_rx) = unbounded();
973 let flush_batch = flush_batch_tx.clone();
974 let executor = cx.clone();
975 let _batch_task = cx.spawn_with_priority(gpui::Priority::High, async move {
976 let mut current_batch = vec![];
977 let mut items_produced_so_far = 0_u64;
978
979 let mut _schedule_flush_after_delay: Option<Task<()>> = None;
980 let _time_elapsed_since_start_of_search = std::time::Instant::now();
981 let mut flush = pin!(flush_batch_rx);
982 let mut terminating = false;
983 loop {
984 select_biased! {
985 item = rx.recv().fuse() => {
986 match item {
987 Ok(new_item) => {
988 let is_fresh_batch = current_batch.is_empty();
989 items_produced_so_far += 1;
990 current_batch.push(new_item);
991 if is_fresh_batch {
992 // Chosen arbitrarily based on some experimentation with plots.
993 let desired_duration_ms = (20 * (items_produced_so_far + 2).ilog2() as u64).min(300);
994 let desired_duration = Duration::from_millis(desired_duration_ms);
995 let _executor = executor.clone();
996 let _flush = flush_batch_tx.clone();
997 let new_timer = executor.spawn_with_priority(Priority::High, async move {
998 _executor.timer(desired_duration).await;
999 _ = _flush.send(false).await;
1000 });
1001 _schedule_flush_after_delay = Some(new_timer);
1002 }
1003 }
1004 Err(_) => {
1005 // Items channel closed - send any remaining batch before exiting
1006 if !current_batch.is_empty() {
1007 _ = batch_tx.send(std::mem::take(&mut current_batch)).await;
1008 }
1009 break;
1010 }
1011 }
1012 }
1013 should_break_afterwards = flush.next() => {
1014 if !current_batch.is_empty() {
1015 _ = batch_tx.send(std::mem::take(&mut current_batch)).await;
1016 _schedule_flush_after_delay = None;
1017 }
1018 if should_break_afterwards.unwrap_or_default() {
1019 terminating = true;
1020 }
1021 }
1022 complete => {
1023 break;
1024 }
1025 }
1026 if terminating {
1027 // Drain any remaining items before exiting
1028 while let Ok(new_item) = rx.try_recv() {
1029 current_batch.push(new_item);
1030 }
1031 if !current_batch.is_empty() {
1032 _ = batch_tx.send(std::mem::take(&mut current_batch)).await;
1033 }
1034 break;
1035 }
1036 }
1037 });
1038 let this = Self {
1039 items,
1040 _batch_task,
1041 flush_batch,
1042 };
1043 (this, batch_rx)
1044 }
1045
1046 pub async fn push(&self, item: T) {
1047 _ = self.items.send(item).await;
1048 }
1049
1050 pub async fn flush(self) {
1051 _ = self.flush_batch.send(true).await;
1052 self._batch_task.await;
1053 }
1054}