project_search.rs

  1use std::{
  2    io::{BufRead, BufReader},
  3    path::Path,
  4    pin::pin,
  5    sync::{
  6        Arc,
  7        atomic::{AtomicUsize, Ordering},
  8    },
  9};
 10
 11use anyhow::Context;
 12use collections::HashSet;
 13use fs::Fs;
 14use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
 15use gpui::{App, AppContext, AsyncApp, Entity, Task};
 16use language::{Buffer, BufferSnapshot};
 17use parking_lot::Mutex;
 18use postage::oneshot;
 19use rpc::{AnyProtoClient, proto};
 20use smol::{
 21    channel::{Receiver, Sender, bounded, unbounded},
 22    future::FutureExt,
 23};
 24
 25use text::BufferId;
 26use util::{ResultExt, maybe, paths::compare_rel_paths};
 27use worktree::{Entry, ProjectEntryId, Snapshot, Worktree};
 28
 29use crate::{
 30    Project, ProjectItem, ProjectPath, RemotelyCreatedModels,
 31    buffer_store::BufferStore,
 32    search::{SearchQuery, SearchResult},
 33    worktree_store::WorktreeStore,
 34};
 35
 36pub struct Search {
 37    buffer_store: Entity<BufferStore>,
 38    worktree_store: Entity<WorktreeStore>,
 39    limit: usize,
 40    kind: SearchKind,
 41}
 42
 43/// Represents search setup, before it is actually kicked off with Search::into_results
 44enum SearchKind {
 45    /// Search for candidates by inspecting file contents on file system, avoiding loading the buffer unless we know that a given file contains a match.
 46    Local {
 47        fs: Arc<dyn Fs>,
 48        worktrees: Vec<Entity<Worktree>>,
 49    },
 50    /// Query remote host for candidates. As of writing, the host runs a local search in "buffers with matches only" mode.
 51    Remote {
 52        client: AnyProtoClient,
 53        remote_id: u64,
 54        models: Arc<Mutex<RemotelyCreatedModels>>,
 55    },
 56    /// Run search against a known set of candidates. Even when working with a remote host, this won't round-trip to host.
 57    OpenBuffersOnly,
 58}
 59
 60/// Represents results of project search and allows one to either obtain match positions OR
 61/// just the handles to buffers that may match the search. Grabbing the handles is cheaper than obtaining full match positions, because in that case we'll look for
 62/// at most one match in each file.
 63#[must_use]
 64pub struct SearchResultsHandle {
 65    results: Receiver<SearchResult>,
 66    matching_buffers: Receiver<Entity<Buffer>>,
 67    trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
 68}
 69
 70impl SearchResultsHandle {
 71    pub fn results(self, cx: &mut App) -> Receiver<SearchResult> {
 72        (self.trigger_search)(cx).detach();
 73        self.results
 74    }
 75    pub fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
 76        (self.trigger_search)(cx).detach();
 77        self.matching_buffers
 78    }
 79}
 80
 81#[derive(Clone)]
 82enum FindSearchCandidates {
 83    Local {
 84        fs: Arc<dyn Fs>,
 85        /// Start off with all paths in project and filter them based on:
 86        /// - Include filters
 87        /// - Exclude filters
 88        /// - Only open buffers
 89        /// - Scan ignored files
 90        /// Put another way: filter out files that can't match (without looking at file contents)
 91        input_paths_rx: Receiver<InputPath>,
 92        /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
 93        /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
 94        confirm_contents_will_match_tx: Sender<MatchingEntry>,
 95        confirm_contents_will_match_rx: Receiver<MatchingEntry>,
 96        /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
 97        /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
 98        get_buffer_for_full_scan_tx: Sender<ProjectPath>,
 99    },
100    Remote,
101    OpenBuffersOnly,
102}
103
104impl Search {
105    pub fn local(
106        fs: Arc<dyn Fs>,
107        buffer_store: Entity<BufferStore>,
108        worktree_store: Entity<WorktreeStore>,
109        limit: usize,
110        cx: &mut App,
111    ) -> Self {
112        let worktrees = worktree_store.read(cx).visible_worktrees(cx).collect();
113        Self {
114            kind: SearchKind::Local { fs, worktrees },
115            buffer_store,
116            worktree_store,
117            limit,
118        }
119    }
120
121    pub(crate) fn remote(
122        buffer_store: Entity<BufferStore>,
123        worktree_store: Entity<WorktreeStore>,
124        limit: usize,
125        client_state: (AnyProtoClient, u64, Arc<Mutex<RemotelyCreatedModels>>),
126    ) -> Self {
127        Self {
128            kind: SearchKind::Remote {
129                client: client_state.0,
130                remote_id: client_state.1,
131                models: client_state.2,
132            },
133            buffer_store,
134            worktree_store,
135            limit,
136        }
137    }
138    pub(crate) fn open_buffers_only(
139        buffer_store: Entity<BufferStore>,
140        worktree_store: Entity<WorktreeStore>,
141        limit: usize,
142    ) -> Self {
143        Self {
144            kind: SearchKind::OpenBuffersOnly,
145            buffer_store,
146            worktree_store,
147            limit,
148        }
149    }
150
151    pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000;
152    pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
153    /// Prepares a project search run. The resulting [`SearchResultsHandle`] has to be used to specify whether you're interested in matching buffers
154    /// or full search results.
155    pub fn into_handle(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
156        let mut open_buffers = HashSet::default();
157        let mut unnamed_buffers = Vec::new();
158        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
159        let buffers = self.buffer_store.read(cx);
160        for handle in buffers.buffers() {
161            let buffer = handle.read(cx);
162            if !buffers.is_searchable(&buffer.remote_id()) {
163                continue;
164            } else if let Some(entry_id) = buffer.entry_id(cx) {
165                open_buffers.insert(entry_id);
166            } else {
167                self.limit = self.limit.saturating_sub(1);
168                unnamed_buffers.push(handle)
169            };
170        }
171        let executor = cx.background_executor().clone();
172        let (tx, rx) = unbounded();
173        let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
174        let matching_buffers = grab_buffer_snapshot_rx.clone();
175        let trigger_search = Box::new(move |cx: &mut App| {
176            cx.spawn(async move |cx| {
177                for buffer in unnamed_buffers {
178                    _ = grab_buffer_snapshot_tx.send(buffer).await;
179                }
180
181                let (find_all_matches_tx, find_all_matches_rx) =
182                    bounded(MAX_CONCURRENT_BUFFER_OPENS);
183
184                let (candidate_searcher, tasks) = match self.kind {
185                    SearchKind::OpenBuffersOnly => {
186                        let Ok(open_buffers) = cx.update(|cx| self.all_loaded_buffers(&query, cx))
187                        else {
188                            return;
189                        };
190                        let fill_requests = cx
191                            .background_spawn(async move {
192                                for buffer in open_buffers {
193                                    if let Err(_) = grab_buffer_snapshot_tx.send(buffer).await {
194                                        return;
195                                    }
196                                }
197                            })
198                            .boxed_local();
199                        (FindSearchCandidates::OpenBuffersOnly, vec![fill_requests])
200                    }
201                    SearchKind::Local {
202                        fs,
203                        ref mut worktrees,
204                    } => {
205                        let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
206                            unbounded();
207                        let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
208                            bounded(64);
209                        let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
210
211                        let (input_paths_tx, input_paths_rx) = unbounded();
212
213                        let tasks = vec![
214                            cx.spawn(Self::provide_search_paths(
215                                std::mem::take(worktrees),
216                                query.include_ignored(),
217                                input_paths_tx,
218                                sorted_search_results_tx,
219                            ))
220                            .boxed_local(),
221                            Self::open_buffers(
222                                &self.buffer_store,
223                                get_buffer_for_full_scan_rx,
224                                grab_buffer_snapshot_tx,
225                                cx.clone(),
226                            )
227                            .boxed_local(),
228                            cx.background_spawn(Self::maintain_sorted_search_results(
229                                sorted_search_results_rx,
230                                get_buffer_for_full_scan_tx.clone(),
231                                self.limit,
232                            ))
233                            .boxed_local(),
234                        ];
235                        (
236                            FindSearchCandidates::Local {
237                                fs,
238                                get_buffer_for_full_scan_tx,
239                                confirm_contents_will_match_tx,
240                                confirm_contents_will_match_rx,
241                                input_paths_rx,
242                            },
243                            tasks,
244                        )
245                    }
246                    SearchKind::Remote {
247                        client,
248                        remote_id,
249                        models,
250                    } => {
251                        let request = client.request(proto::FindSearchCandidates {
252                            project_id: remote_id,
253                            query: Some(query.to_proto()),
254                            limit: self.limit as _,
255                        });
256                        let Ok(guard) = cx.update(|cx| {
257                            Project::retain_remotely_created_models_impl(
258                                &models,
259                                &self.buffer_store,
260                                &self.worktree_store,
261                                cx,
262                            )
263                        }) else {
264                            return;
265                        };
266                        let buffer_store = self.buffer_store.downgrade();
267                        let issue_remote_buffers_request = cx
268                            .spawn(async move |cx| {
269                                let _ = maybe!(async move {
270                                    let response = request.await?;
271
272                                    for buffer_id in response.buffer_ids {
273                                        let buffer_id = BufferId::new(buffer_id)?;
274                                        let buffer = buffer_store
275                                            .update(cx, |buffer_store, cx| {
276                                                buffer_store.wait_for_remote_buffer(buffer_id, cx)
277                                            })?
278                                            .await?;
279                                        let _ = grab_buffer_snapshot_tx.send(buffer).await;
280                                    }
281
282                                    drop(guard);
283                                    anyhow::Ok(())
284                                })
285                                .await
286                                .log_err();
287                            })
288                            .boxed_local();
289                        (
290                            FindSearchCandidates::Remote,
291                            vec![issue_remote_buffers_request],
292                        )
293                    }
294                };
295
296                let matches_count = AtomicUsize::new(0);
297                let matched_buffer_count = AtomicUsize::new(0);
298
299                let worker_pool = executor.scoped(|scope| {
300                    let num_cpus = executor.num_cpus();
301
302                    assert!(num_cpus > 0);
303                    for _ in 0..executor.num_cpus() - 1 {
304                        let worker = Worker {
305                            query: &query,
306                            open_buffers: &open_buffers,
307                            matched_buffer_count: &matched_buffer_count,
308                            matches_count: &matches_count,
309                            candidates: candidate_searcher.clone(),
310                            find_all_matches_rx: find_all_matches_rx.clone(),
311                            publish_matches: tx.clone(),
312                        };
313                        scope.spawn(worker.run());
314                    }
315                    drop(tx);
316                    drop(find_all_matches_rx);
317                    drop(candidate_searcher);
318                });
319
320                let buffer_snapshots = Self::grab_buffer_snapshots(
321                    grab_buffer_snapshot_rx,
322                    find_all_matches_tx,
323                    cx.clone(),
324                );
325                futures::future::join_all(
326                    [worker_pool.boxed_local(), buffer_snapshots.boxed_local()]
327                        .into_iter()
328                        .chain(tasks),
329                )
330                .await;
331            })
332        });
333
334        SearchResultsHandle {
335            results: rx,
336            matching_buffers,
337            trigger_search,
338        }
339    }
340
341    fn provide_search_paths(
342        worktrees: Vec<Entity<Worktree>>,
343        include_ignored: bool,
344        tx: Sender<InputPath>,
345        results: Sender<oneshot::Receiver<ProjectPath>>,
346    ) -> impl AsyncFnOnce(&mut AsyncApp) {
347        async move |cx| {
348            _ = maybe!(async move {
349                for worktree in worktrees {
350                    let (mut snapshot, worktree_settings) = worktree
351                        .read_with(cx, |this, _| {
352                            Some((this.snapshot(), this.as_local()?.settings()))
353                        })?
354                        .context("The worktree is not local")?;
355                    if include_ignored {
356                        // Pre-fetch all of the ignored directories as they're going to be searched.
357                        let mut entries_to_refresh = vec![];
358                        for entry in snapshot.entries(include_ignored, 0) {
359                            if entry.is_ignored && entry.kind.is_unloaded() {
360                                if !worktree_settings.is_path_excluded(&entry.path) {
361                                    entries_to_refresh.push(entry.path.clone());
362                                }
363                            }
364                        }
365                        let barrier = worktree.update(cx, |this, _| {
366                            let local = this.as_local_mut()?;
367                            let barrier = entries_to_refresh
368                                .into_iter()
369                                .map(|path| local.add_path_prefix_to_scan(path).into_future())
370                                .collect::<Vec<_>>();
371                            Some(barrier)
372                        })?;
373                        if let Some(barriers) = barrier {
374                            futures::future::join_all(barriers).await;
375                        }
376                        snapshot = worktree.read_with(cx, |this, _| this.snapshot())?;
377                    }
378                    cx.background_executor()
379                        .scoped(|scope| {
380                            scope.spawn(async {
381                                for entry in snapshot.files(include_ignored, 0) {
382                                    let (should_scan_tx, should_scan_rx) = oneshot::channel();
383                                    let Ok(_) = tx
384                                        .send(InputPath {
385                                            entry: entry.clone(),
386                                            snapshot: snapshot.clone(),
387                                            should_scan_tx,
388                                        })
389                                        .await
390                                    else {
391                                        return;
392                                    };
393                                    if results.send(should_scan_rx).await.is_err() {
394                                        return;
395                                    };
396                                }
397                            })
398                        })
399                        .await;
400                }
401                anyhow::Ok(())
402            })
403            .await;
404        }
405    }
406
407    async fn maintain_sorted_search_results(
408        rx: Receiver<oneshot::Receiver<ProjectPath>>,
409        paths_for_full_scan: Sender<ProjectPath>,
410        limit: usize,
411    ) {
412        let mut rx = pin!(rx);
413        let mut matched = 0;
414        while let Some(mut next_path_result) = rx.next().await {
415            let Some(successful_path) = next_path_result.next().await else {
416                // This math did not produce a match, hence skip it.
417                continue;
418            };
419            if paths_for_full_scan.send(successful_path).await.is_err() {
420                return;
421            };
422            matched += 1;
423            if matched >= limit {
424                break;
425            }
426        }
427    }
428
429    /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
430    async fn open_buffers(
431        buffer_store: &Entity<BufferStore>,
432        rx: Receiver<ProjectPath>,
433        find_all_matches_tx: Sender<Entity<Buffer>>,
434        mut cx: AsyncApp,
435    ) {
436        let mut rx = pin!(rx.ready_chunks(64));
437        _ = maybe!(async move {
438            while let Some(requested_paths) = rx.next().await {
439                let mut buffers = buffer_store.update(&mut cx, |this, cx| {
440                    requested_paths
441                        .into_iter()
442                        .map(|path| this.open_buffer(path, cx))
443                        .collect::<FuturesOrdered<_>>()
444                })?;
445
446                while let Some(buffer) = buffers.next().await {
447                    if let Some(buffer) = buffer.log_err() {
448                        find_all_matches_tx.send(buffer).await?;
449                    }
450                }
451            }
452            Result::<_, anyhow::Error>::Ok(())
453        })
454        .await;
455    }
456
457    async fn grab_buffer_snapshots(
458        rx: Receiver<Entity<Buffer>>,
459        find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
460        mut cx: AsyncApp,
461    ) {
462        _ = maybe!(async move {
463            while let Ok(buffer) = rx.recv().await {
464                let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
465                find_all_matches_tx.send((buffer, snapshot)).await?;
466            }
467            Result::<_, anyhow::Error>::Ok(())
468        })
469        .await;
470    }
471
472    fn all_loaded_buffers(&self, search_query: &SearchQuery, cx: &App) -> Vec<Entity<Buffer>> {
473        let worktree_store = self.worktree_store.read(cx);
474        let mut buffers = search_query
475            .buffers()
476            .into_iter()
477            .flatten()
478            .filter(|buffer| {
479                let b = buffer.read(cx);
480                if let Some(file) = b.file() {
481                    if !search_query.match_path(file.path().as_std_path()) {
482                        return false;
483                    }
484                    if !search_query.include_ignored()
485                        && let Some(entry) = b
486                            .entry_id(cx)
487                            .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx))
488                        && entry.is_ignored
489                    {
490                        return false;
491                    }
492                }
493                true
494            })
495            .cloned()
496            .collect::<Vec<_>>();
497        buffers.sort_by(|a, b| {
498            let a = a.read(cx);
499            let b = b.read(cx);
500            match (a.file(), b.file()) {
501                (None, None) => a.remote_id().cmp(&b.remote_id()),
502                (None, Some(_)) => std::cmp::Ordering::Less,
503                (Some(_), None) => std::cmp::Ordering::Greater,
504                (Some(a), Some(b)) => compare_rel_paths((a.path(), true), (b.path(), true)),
505            }
506        });
507
508        buffers
509    }
510}
511
512struct Worker<'search> {
513    query: &'search SearchQuery,
514    matched_buffer_count: &'search AtomicUsize,
515    matches_count: &'search AtomicUsize,
516    open_buffers: &'search HashSet<ProjectEntryId>,
517    candidates: FindSearchCandidates,
518    /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
519    find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
520    /// Cool, we have results; let's share them with the world.
521    publish_matches: Sender<SearchResult>,
522}
523
524impl Worker<'_> {
525    async fn run(mut self) {
526        let (
527            input_paths_rx,
528            confirm_contents_will_match_rx,
529            mut confirm_contents_will_match_tx,
530            mut get_buffer_for_full_scan_tx,
531            fs,
532        ) = match self.candidates {
533            FindSearchCandidates::Local {
534                fs,
535                input_paths_rx,
536                confirm_contents_will_match_rx,
537                confirm_contents_will_match_tx,
538                get_buffer_for_full_scan_tx,
539            } => (
540                input_paths_rx,
541                confirm_contents_will_match_rx,
542                confirm_contents_will_match_tx,
543                get_buffer_for_full_scan_tx,
544                Some(fs),
545            ),
546            FindSearchCandidates::Remote | FindSearchCandidates::OpenBuffersOnly => (
547                unbounded().1,
548                unbounded().1,
549                unbounded().0,
550                unbounded().0,
551                None,
552            ),
553        };
554        let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
555        let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse());
556        let mut scan_path = pin!(input_paths_rx.fuse());
557
558        loop {
559            let handler = RequestHandler {
560                query: self.query,
561                open_entries: &self.open_buffers,
562                fs: fs.as_deref(),
563                matched_buffer_count: self.matched_buffer_count,
564                matches_count: self.matches_count,
565                confirm_contents_will_match_tx: &confirm_contents_will_match_tx,
566                get_buffer_for_full_scan_tx: &get_buffer_for_full_scan_tx,
567                publish_matches: &self.publish_matches,
568            };
569            // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
570            // steps straight away. Another worker might be about to produce a value that will
571            // be pushed there, thus we'll replace current worker's pipe with a dummy one.
572            // That way, we'll only ever close a next-stage channel when ALL workers do so.
573            select_biased! {
574                find_all_matches = find_all_matches.next() => {
575
576                    if self.publish_matches.is_closed() {
577                        break;
578                    }
579                    let Some(matches) = find_all_matches else {
580                        self.publish_matches = bounded(1).0;
581                        continue;
582                    };
583                    let result = handler.handle_find_all_matches(matches).await;
584                    if let Some(_should_bail) = result {
585
586                        self.publish_matches = bounded(1).0;
587                        continue;
588                    }
589                },
590                find_first_match = find_first_match.next() => {
591                    if let Some(buffer_with_at_least_one_match) = find_first_match {
592                        handler.handle_find_first_match(buffer_with_at_least_one_match).await;
593                    } else {
594                        get_buffer_for_full_scan_tx = bounded(1).0;
595                    }
596
597                },
598                scan_path = scan_path.next() => {
599                    if let Some(path_to_scan) = scan_path {
600                        handler.handle_scan_path(path_to_scan).await;
601                    } else {
602                        // If we're the last worker to notice that this is not producing values, close the upstream.
603                        confirm_contents_will_match_tx = bounded(1).0;
604                    }
605
606                 }
607                 complete => {
608                     break
609                },
610
611            }
612        }
613    }
614}
615
616struct RequestHandler<'worker> {
617    query: &'worker SearchQuery,
618    fs: Option<&'worker dyn Fs>,
619    open_entries: &'worker HashSet<ProjectEntryId>,
620    matched_buffer_count: &'worker AtomicUsize,
621    matches_count: &'worker AtomicUsize,
622
623    confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
624    get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
625    publish_matches: &'worker Sender<SearchResult>,
626}
627
628struct LimitReached;
629
630impl RequestHandler<'_> {
631    async fn handle_find_all_matches(
632        &self,
633        (buffer, snapshot): (Entity<Buffer>, BufferSnapshot),
634    ) -> Option<LimitReached> {
635        let ranges = self
636            .query
637            .search(&snapshot, None)
638            .await
639            .iter()
640            .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
641            .collect::<Vec<_>>();
642
643        let matched_ranges = ranges.len();
644        if self.matched_buffer_count.fetch_add(1, Ordering::Release)
645            > Search::MAX_SEARCH_RESULT_FILES
646            || self
647                .matches_count
648                .fetch_add(matched_ranges, Ordering::Release)
649                > Search::MAX_SEARCH_RESULT_RANGES
650        {
651            _ = self.publish_matches.send(SearchResult::LimitReached).await;
652            Some(LimitReached)
653        } else {
654            _ = self
655                .publish_matches
656                .send(SearchResult::Buffer { buffer, ranges })
657                .await;
658            None
659        }
660    }
661    async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
662        _=maybe!(async move {
663            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
664            let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else {
665                return anyhow::Ok(());
666            };
667
668            let mut file = BufReader::new(file);
669            let file_start = file.fill_buf()?;
670
671            if let Err(Some(starting_position)) =
672            std::str::from_utf8(file_start).map_err(|e| e.error_len())
673            {
674                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
675                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
676                log::debug!(
677                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
678                );
679                return Ok(());
680            }
681
682            if self.query.detect(file).unwrap_or(false) {
683                // Yes, we should scan the whole file.
684                entry.should_scan_tx.send(entry.path).await?;
685            }
686            Ok(())
687        }).await;
688    }
689
690    async fn handle_scan_path(&self, req: InputPath) {
691        _ = maybe!(async move {
692            let InputPath {
693                entry,
694
695                snapshot,
696                should_scan_tx,
697            } = req;
698
699            if entry.is_fifo || !entry.is_file() {
700                return Ok(());
701            }
702
703            if self.query.filters_path() {
704                let matched_path = if self.query.match_full_paths() {
705                    let mut full_path = snapshot.root_name().as_std_path().to_owned();
706                    full_path.push(entry.path.as_std_path());
707                    self.query.match_path(&full_path)
708                } else {
709                    self.query.match_path(entry.path.as_std_path())
710                };
711                if !matched_path {
712                    return Ok(());
713                }
714            }
715
716            if self.open_entries.contains(&entry.id) {
717                // The buffer is already in memory and that's the version we want to scan;
718                // hence skip the dilly-dally and look for all matches straight away.
719                self.get_buffer_for_full_scan_tx
720                    .send(ProjectPath {
721                        worktree_id: snapshot.id(),
722                        path: entry.path.clone(),
723                    })
724                    .await?;
725            } else {
726                self.confirm_contents_will_match_tx
727                    .send(MatchingEntry {
728                        should_scan_tx: should_scan_tx,
729                        worktree_root: snapshot.abs_path().clone(),
730                        path: ProjectPath {
731                            worktree_id: snapshot.id(),
732                            path: entry.path.clone(),
733                        },
734                    })
735                    .await?;
736            }
737
738            anyhow::Ok(())
739        })
740        .await;
741    }
742}
743
744struct InputPath {
745    entry: Entry,
746    snapshot: Snapshot,
747    should_scan_tx: oneshot::Sender<ProjectPath>,
748}
749
750struct MatchingEntry {
751    worktree_root: Arc<Path>,
752    path: ProjectPath,
753    should_scan_tx: oneshot::Sender<ProjectPath>,
754}