project_search.rs

  1use std::{
  2    io::{BufRead, BufReader},
  3    path::Path,
  4    pin::pin,
  5    sync::{
  6        Arc,
  7        atomic::{AtomicUsize, Ordering},
  8    },
  9};
 10
 11use anyhow::Context;
 12use collections::HashSet;
 13use fs::Fs;
 14use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
 15use gpui::{App, AppContext, AsyncApp, Entity, Task};
 16use language::{Buffer, BufferSnapshot};
 17use parking_lot::Mutex;
 18use postage::oneshot;
 19use rpc::{AnyProtoClient, proto};
 20use smol::{
 21    channel::{Receiver, Sender, bounded, unbounded},
 22    future::FutureExt,
 23};
 24
 25use text::BufferId;
 26use util::{ResultExt, maybe, paths::compare_rel_paths};
 27use worktree::{Entry, ProjectEntryId, Snapshot, Worktree};
 28
 29use crate::{
 30    Project, ProjectItem, ProjectPath, RemotelyCreatedModels,
 31    buffer_store::BufferStore,
 32    search::{SearchQuery, SearchResult},
 33    worktree_store::WorktreeStore,
 34};
 35
 36pub struct Search {
 37    buffer_store: Entity<BufferStore>,
 38    worktree_store: Entity<WorktreeStore>,
 39    limit: usize,
 40    kind: SearchKind,
 41}
 42
 43enum SearchKind {
 44    Local {
 45        fs: Arc<dyn Fs>,
 46        worktrees: Vec<Entity<Worktree>>,
 47    },
 48    Remote {
 49        client: AnyProtoClient,
 50        remote_id: u64,
 51        models: Arc<Mutex<RemotelyCreatedModels>>,
 52    },
 53    OpenBuffersOnly,
 54}
 55
 56/// Represents results of project search and allows one to either obtain match positions OR
 57/// just the handles to buffers that may match the search.
 58#[must_use]
 59pub struct SearchResultsHandle {
 60    results: Receiver<SearchResult>,
 61    matching_buffers: Receiver<Entity<Buffer>>,
 62    trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
 63}
 64
 65impl SearchResultsHandle {
 66    pub fn results(self, cx: &mut App) -> Receiver<SearchResult> {
 67        (self.trigger_search)(cx).detach();
 68        self.results
 69    }
 70    pub fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
 71        (self.trigger_search)(cx).detach();
 72        self.matching_buffers
 73    }
 74}
 75
 76#[derive(Clone)]
 77enum FindSearchCandidates {
 78    Local {
 79        fs: Arc<dyn Fs>,
 80        /// Start off with all paths in project and filter them based on:
 81        /// - Include filters
 82        /// - Exclude filters
 83        /// - Only open buffers
 84        /// - Scan ignored files
 85        /// Put another way: filter out files that can't match (without looking at file contents)
 86        input_paths_rx: Receiver<InputPath>,
 87        /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
 88        /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
 89        confirm_contents_will_match_tx: Sender<MatchingEntry>,
 90        confirm_contents_will_match_rx: Receiver<MatchingEntry>,
 91        /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
 92        /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
 93        get_buffer_for_full_scan_tx: Sender<ProjectPath>,
 94    },
 95    Remote,
 96    OpenBuffersOnly,
 97}
 98
 99impl Search {
100    pub fn local(
101        fs: Arc<dyn Fs>,
102        buffer_store: Entity<BufferStore>,
103        worktree_store: Entity<WorktreeStore>,
104        limit: usize,
105        cx: &mut App,
106    ) -> Self {
107        let worktrees = worktree_store.read(cx).visible_worktrees(cx).collect();
108        Self {
109            kind: SearchKind::Local { fs, worktrees },
110            buffer_store,
111            worktree_store,
112            limit,
113        }
114    }
115
116    pub(crate) fn remote(
117        buffer_store: Entity<BufferStore>,
118        worktree_store: Entity<WorktreeStore>,
119        limit: usize,
120        client_state: (AnyProtoClient, u64, Arc<Mutex<RemotelyCreatedModels>>),
121    ) -> Self {
122        Self {
123            kind: SearchKind::Remote {
124                client: client_state.0,
125                remote_id: client_state.1,
126                models: client_state.2,
127            },
128            buffer_store,
129            worktree_store,
130            limit,
131        }
132    }
133    pub(crate) fn open_buffers_only(
134        buffer_store: Entity<BufferStore>,
135        worktree_store: Entity<WorktreeStore>,
136        limit: usize,
137    ) -> Self {
138        Self {
139            kind: SearchKind::OpenBuffersOnly,
140            buffer_store,
141            worktree_store,
142            limit,
143        }
144    }
145
146    pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000;
147    pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
148    /// Prepares a project search run. The result has to be used to specify whether you're interested in matching buffers
149    /// or full search results.
150    pub fn into_results(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
151        let mut open_buffers = HashSet::default();
152        let mut unnamed_buffers = Vec::new();
153        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
154        let buffers = self.buffer_store.read(cx);
155        for handle in buffers.buffers() {
156            let buffer = handle.read(cx);
157            if !buffers.is_searchable(&buffer.remote_id()) {
158                continue;
159            } else if let Some(entry_id) = buffer.entry_id(cx) {
160                open_buffers.insert(entry_id);
161            } else {
162                self.limit -= self.limit.saturating_sub(1);
163                unnamed_buffers.push(handle)
164            };
165        }
166        let executor = cx.background_executor().clone();
167        let (tx, rx) = unbounded();
168        let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
169        let matching_buffers = grab_buffer_snapshot_rx.clone();
170        let trigger_search = Box::new(move |cx: &mut App| {
171            cx.spawn(async move |cx| {
172                for buffer in unnamed_buffers {
173                    _ = grab_buffer_snapshot_tx.send(buffer).await;
174                }
175
176                let (find_all_matches_tx, find_all_matches_rx) =
177                    bounded(MAX_CONCURRENT_BUFFER_OPENS);
178
179                let (candidate_searcher, tasks) = match self.kind {
180                    SearchKind::OpenBuffersOnly => {
181                        let Ok(open_buffers) = cx.update(|cx| self.all_loaded_buffers(&query, cx))
182                        else {
183                            return;
184                        };
185                        let fill_requests = cx
186                            .background_spawn(async move {
187                                for buffer in open_buffers {
188                                    if let Err(_) = grab_buffer_snapshot_tx.send(buffer).await {
189                                        return;
190                                    }
191                                }
192                            })
193                            .boxed_local();
194                        (FindSearchCandidates::OpenBuffersOnly, vec![fill_requests])
195                    }
196                    SearchKind::Local {
197                        fs,
198                        ref mut worktrees,
199                    } => {
200                        let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
201                            unbounded();
202                        let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
203                            bounded(64);
204                        let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
205
206                        let (input_paths_tx, input_paths_rx) = unbounded();
207
208                        let tasks = vec![
209                            cx.spawn(Self::provide_search_paths(
210                                std::mem::take(worktrees),
211                                query.include_ignored(),
212                                input_paths_tx,
213                                sorted_search_results_tx,
214                            ))
215                            .boxed_local(),
216                            Self::open_buffers(
217                                &self.buffer_store,
218                                get_buffer_for_full_scan_rx,
219                                grab_buffer_snapshot_tx,
220                                cx.clone(),
221                            )
222                            .boxed_local(),
223                            cx.background_spawn(Self::maintain_sorted_search_results(
224                                sorted_search_results_rx,
225                                get_buffer_for_full_scan_tx.clone(),
226                                self.limit,
227                            ))
228                            .boxed_local(),
229                        ];
230                        (
231                            FindSearchCandidates::Local {
232                                fs,
233                                get_buffer_for_full_scan_tx,
234                                confirm_contents_will_match_tx,
235                                confirm_contents_will_match_rx,
236                                input_paths_rx,
237                            },
238                            tasks,
239                        )
240                    }
241                    SearchKind::Remote {
242                        client,
243                        remote_id,
244                        models,
245                    } => {
246                        let request = client.request(proto::FindSearchCandidates {
247                            project_id: remote_id,
248                            query: Some(query.to_proto()),
249                            limit: self.limit as _,
250                        });
251                        let Ok(guard) = cx.update(|cx| {
252                            Project::retain_remotely_created_models_impl(
253                                &models,
254                                &self.buffer_store,
255                                &self.worktree_store,
256                                cx,
257                            )
258                        }) else {
259                            return;
260                        };
261                        let buffer_store = self.buffer_store.downgrade();
262                        let issue_remote_buffers_request = cx
263                            .spawn(async move |cx| {
264                                let _ = maybe!(async move {
265                                    let response = request.await?;
266
267                                    for buffer_id in response.buffer_ids {
268                                        let buffer_id = BufferId::new(buffer_id)?;
269                                        let buffer = buffer_store
270                                            .update(cx, |buffer_store, cx| {
271                                                buffer_store.wait_for_remote_buffer(buffer_id, cx)
272                                            })?
273                                            .await?;
274                                        let _ = grab_buffer_snapshot_tx.send(buffer).await;
275                                    }
276
277                                    drop(guard);
278                                    anyhow::Ok(())
279                                })
280                                .await
281                                .log_err();
282                            })
283                            .boxed_local();
284                        (
285                            FindSearchCandidates::Remote,
286                            vec![issue_remote_buffers_request],
287                        )
288                    }
289                };
290
291                let matches_count = AtomicUsize::new(0);
292                let matched_buffer_count = AtomicUsize::new(0);
293
294                let worker_pool = executor.scoped(|scope| {
295                    let num_cpus = executor.num_cpus();
296
297                    assert!(num_cpus > 0);
298                    for _ in 0..executor.num_cpus() - 1 {
299                        let worker = Worker {
300                            query: &query,
301                            open_buffers: &open_buffers,
302                            matched_buffer_count: &matched_buffer_count,
303                            matches_count: &matches_count,
304                            candidates: candidate_searcher.clone(),
305                            find_all_matches_rx: find_all_matches_rx.clone(),
306                            publish_matches: tx.clone(),
307                        };
308                        scope.spawn(worker.run());
309                    }
310                    drop(tx);
311                    drop(find_all_matches_rx);
312                    drop(candidate_searcher);
313                });
314
315                let buffer_snapshots = Self::grab_buffer_snapshots(
316                    grab_buffer_snapshot_rx,
317                    find_all_matches_tx,
318                    cx.clone(),
319                );
320                futures::future::join_all(
321                    [worker_pool.boxed_local(), buffer_snapshots.boxed_local()]
322                        .into_iter()
323                        .chain(tasks),
324                )
325                .await;
326            })
327        });
328
329        SearchResultsHandle {
330            results: rx,
331            matching_buffers,
332            trigger_search,
333        }
334    }
335
336    fn provide_search_paths(
337        worktrees: Vec<Entity<Worktree>>,
338        include_ignored: bool,
339        tx: Sender<InputPath>,
340        results: Sender<oneshot::Receiver<ProjectPath>>,
341    ) -> impl AsyncFnOnce(&mut AsyncApp) {
342        async move |cx| {
343            _ = maybe!(async move {
344                for worktree in worktrees {
345                    let (mut snapshot, worktree_settings) = worktree
346                        .read_with(cx, |this, _| {
347                            Some((this.snapshot(), this.as_local()?.settings()))
348                        })?
349                        .context("The worktree is not local")?;
350                    if include_ignored {
351                        // Pre-fetch all of the ignored directories as they're going to be searched.
352                        let mut entries_to_refresh = vec![];
353                        for entry in snapshot.entries(include_ignored, 0) {
354                            if entry.is_ignored && entry.kind.is_unloaded() {
355                                if !worktree_settings.is_path_excluded(&entry.path) {
356                                    entries_to_refresh.push(entry.path.clone());
357                                }
358                            }
359                        }
360                        let barrier = worktree.update(cx, |this, _| {
361                            let local = this.as_local_mut()?;
362                            let barrier = entries_to_refresh
363                                .into_iter()
364                                .map(|path| local.add_path_prefix_to_scan(path).into_future())
365                                .collect::<Vec<_>>();
366                            Some(barrier)
367                        })?;
368                        if let Some(barriers) = barrier {
369                            futures::future::join_all(barriers).await;
370                        }
371                        snapshot = worktree.read_with(cx, |this, _| this.snapshot())?;
372                    }
373                    cx.background_executor()
374                        .scoped(|scope| {
375                            scope.spawn(async {
376                                for entry in snapshot.files(include_ignored, 0) {
377                                    let (should_scan_tx, should_scan_rx) = oneshot::channel();
378                                    let Ok(_) = tx
379                                        .send(InputPath {
380                                            entry: entry.clone(),
381                                            snapshot: snapshot.clone(),
382                                            should_scan_tx,
383                                        })
384                                        .await
385                                    else {
386                                        return;
387                                    };
388                                    if results.send(should_scan_rx).await.is_err() {
389                                        return;
390                                    };
391                                }
392                            })
393                        })
394                        .await;
395                }
396                anyhow::Ok(())
397            })
398            .await;
399        }
400    }
401
402    async fn maintain_sorted_search_results(
403        rx: Receiver<oneshot::Receiver<ProjectPath>>,
404        paths_for_full_scan: Sender<ProjectPath>,
405        limit: usize,
406    ) {
407        let mut rx = pin!(rx);
408        let mut matched = 0;
409        while let Some(mut next_path_result) = rx.next().await {
410            let Some(successful_path) = next_path_result.next().await else {
411                // This math did not produce a match, hence skip it.
412                continue;
413            };
414            if paths_for_full_scan.send(successful_path).await.is_err() {
415                return;
416            };
417            matched += 1;
418            if matched >= limit {
419                break;
420            }
421        }
422    }
423
424    /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
425    async fn open_buffers(
426        buffer_store: &Entity<BufferStore>,
427        rx: Receiver<ProjectPath>,
428        find_all_matches_tx: Sender<Entity<Buffer>>,
429        mut cx: AsyncApp,
430    ) {
431        let mut rx = pin!(rx.ready_chunks(64));
432        _ = maybe!(async move {
433            while let Some(requested_paths) = rx.next().await {
434                let mut buffers = buffer_store.update(&mut cx, |this, cx| {
435                    requested_paths
436                        .into_iter()
437                        .map(|path| this.open_buffer(path, cx))
438                        .collect::<FuturesOrdered<_>>()
439                })?;
440
441                while let Some(buffer) = buffers.next().await {
442                    if let Some(buffer) = buffer.log_err() {
443                        find_all_matches_tx.send(buffer).await?;
444                    }
445                }
446            }
447            Result::<_, anyhow::Error>::Ok(())
448        })
449        .await;
450    }
451
452    async fn grab_buffer_snapshots(
453        rx: Receiver<Entity<Buffer>>,
454        find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
455        mut cx: AsyncApp,
456    ) {
457        _ = maybe!(async move {
458            while let Ok(buffer) = rx.recv().await {
459                let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
460                find_all_matches_tx.send((buffer, snapshot)).await?;
461            }
462            Result::<_, anyhow::Error>::Ok(())
463        })
464        .await;
465    }
466
467    fn all_loaded_buffers(&self, search_query: &SearchQuery, cx: &App) -> Vec<Entity<Buffer>> {
468        let worktree_store = self.worktree_store.read(cx);
469        let mut buffers = search_query
470            .buffers()
471            .into_iter()
472            .flatten()
473            .filter(|buffer| {
474                let b = buffer.read(cx);
475                if let Some(file) = b.file() {
476                    if !search_query.match_path(file.path().as_std_path()) {
477                        return false;
478                    }
479                    if !search_query.include_ignored()
480                        && let Some(entry) = b
481                            .entry_id(cx)
482                            .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx))
483                        && entry.is_ignored
484                    {
485                        return false;
486                    }
487                }
488                true
489            })
490            .cloned()
491            .collect::<Vec<_>>();
492        buffers.sort_by(|a, b| {
493            let a = a.read(cx);
494            let b = b.read(cx);
495            match (a.file(), b.file()) {
496                (None, None) => a.remote_id().cmp(&b.remote_id()),
497                (None, Some(_)) => std::cmp::Ordering::Less,
498                (Some(_), None) => std::cmp::Ordering::Greater,
499                (Some(a), Some(b)) => compare_rel_paths((a.path(), true), (b.path(), true)),
500            }
501        });
502
503        buffers
504    }
505}
506
507struct Worker<'search> {
508    query: &'search SearchQuery,
509    matched_buffer_count: &'search AtomicUsize,
510    matches_count: &'search AtomicUsize,
511    open_buffers: &'search HashSet<ProjectEntryId>,
512    candidates: FindSearchCandidates,
513    /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
514    find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
515    /// Cool, we have results; let's share them with the world.
516    publish_matches: Sender<SearchResult>,
517}
518
519impl Worker<'_> {
520    async fn run(mut self) {
521        let (
522            input_paths_rx,
523            confirm_contents_will_match_rx,
524            mut confirm_contents_will_match_tx,
525            mut get_buffer_for_full_scan_tx,
526            fs,
527        ) = match self.candidates {
528            FindSearchCandidates::Local {
529                fs,
530                input_paths_rx,
531                confirm_contents_will_match_rx,
532                confirm_contents_will_match_tx,
533                get_buffer_for_full_scan_tx,
534            } => (
535                input_paths_rx,
536                confirm_contents_will_match_rx,
537                confirm_contents_will_match_tx,
538                get_buffer_for_full_scan_tx,
539                Some(fs),
540            ),
541            FindSearchCandidates::Remote | FindSearchCandidates::OpenBuffersOnly => (
542                unbounded().1,
543                unbounded().1,
544                unbounded().0,
545                unbounded().0,
546                None,
547            ),
548        };
549        let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
550        let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse());
551        let mut scan_path = pin!(input_paths_rx.fuse());
552
553        loop {
554            let handler = RequestHandler {
555                query: self.query,
556                open_entries: &self.open_buffers,
557                fs: fs.as_deref(),
558                matched_buffer_count: self.matched_buffer_count,
559                matches_count: self.matches_count,
560                confirm_contents_will_match_tx: &confirm_contents_will_match_tx,
561                get_buffer_for_full_scan_tx: &get_buffer_for_full_scan_tx,
562                publish_matches: &self.publish_matches,
563            };
564            // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
565            // steps straight away. Another worker might be about to produce a value that will
566            // be pushed there, thus we'll replace current worker's pipe with a dummy one.
567            // That way, we'll only ever close a next-stage channel when ALL workers do so.
568            select_biased! {
569                find_all_matches = find_all_matches.next() => {
570
571                    if self.publish_matches.is_closed() {
572                        break;
573                    }
574                    let Some(matches) = find_all_matches else {
575                        self.publish_matches = bounded(1).0;
576                        continue;
577                    };
578                    let result = handler.handle_find_all_matches(matches).await;
579                    if let Some(_should_bail) = result {
580
581                        self.publish_matches = bounded(1).0;
582                        continue;
583                    }
584                },
585                find_first_match = find_first_match.next() => {
586                    if let Some(buffer_with_at_least_one_match) = find_first_match {
587                        handler.handle_find_first_match(buffer_with_at_least_one_match).await;
588                    } else {
589                        get_buffer_for_full_scan_tx = bounded(1).0;
590                    }
591
592                },
593                scan_path = scan_path.next() => {
594                    if let Some(path_to_scan) = scan_path {
595                        handler.handle_scan_path(path_to_scan).await;
596                    } else {
597                        // If we're the last worker to notice that this is not producing values, close the upstream.
598                        confirm_contents_will_match_tx = bounded(1).0;
599                    }
600
601                 }
602                 complete => {
603                     break
604                },
605
606            }
607        }
608    }
609}
610
611struct RequestHandler<'worker> {
612    query: &'worker SearchQuery,
613    fs: Option<&'worker dyn Fs>,
614    open_entries: &'worker HashSet<ProjectEntryId>,
615    matched_buffer_count: &'worker AtomicUsize,
616    matches_count: &'worker AtomicUsize,
617
618    confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
619    get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
620    publish_matches: &'worker Sender<SearchResult>,
621}
622
623struct LimitReached;
624
625impl RequestHandler<'_> {
626    async fn handle_find_all_matches(
627        &self,
628        (buffer, snapshot): (Entity<Buffer>, BufferSnapshot),
629    ) -> Option<LimitReached> {
630        let ranges = self
631            .query
632            .search(&snapshot, None)
633            .await
634            .iter()
635            .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
636            .collect::<Vec<_>>();
637
638        let matched_ranges = ranges.len();
639        if self.matched_buffer_count.fetch_add(1, Ordering::Release)
640            > Search::MAX_SEARCH_RESULT_FILES
641            || self
642                .matches_count
643                .fetch_add(matched_ranges, Ordering::Release)
644                > Search::MAX_SEARCH_RESULT_RANGES
645        {
646            _ = self.publish_matches.send(SearchResult::LimitReached).await;
647            Some(LimitReached)
648        } else {
649            _ = self
650                .publish_matches
651                .send(SearchResult::Buffer { buffer, ranges })
652                .await;
653            None
654        }
655    }
656    async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
657        _=maybe!(async move {
658            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
659            let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else {
660                return anyhow::Ok(());
661            };
662
663            let mut file = BufReader::new(file);
664            let file_start = file.fill_buf()?;
665
666            if let Err(Some(starting_position)) =
667            std::str::from_utf8(file_start).map_err(|e| e.error_len())
668            {
669                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
670                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
671                log::debug!(
672                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
673                );
674                return Ok(());
675            }
676
677            if self.query.detect(file).unwrap_or(false) {
678                // Yes, we should scan the whole file.
679                entry.should_scan_tx.send(entry.path).await?;
680            }
681            Ok(())
682        }).await;
683    }
684
685    async fn handle_scan_path(&self, req: InputPath) {
686        _ = maybe!(async move {
687            let InputPath {
688                entry,
689
690                snapshot,
691                should_scan_tx,
692            } = req;
693
694            if entry.is_fifo || !entry.is_file() {
695                return Ok(());
696            }
697
698            if self.query.filters_path() {
699                let matched_path = if self.query.match_full_paths() {
700                    let mut full_path = snapshot.root_name().as_std_path().to_owned();
701                    full_path.push(entry.path.as_std_path());
702                    self.query.match_path(&full_path)
703                } else {
704                    self.query.match_path(entry.path.as_std_path())
705                };
706                if !matched_path {
707                    return Ok(());
708                }
709            }
710
711            if self.open_entries.contains(&entry.id) {
712                // The buffer is already in memory and that's the version we want to scan;
713                // hence skip the dilly-dally and look for all matches straight away.
714                self.get_buffer_for_full_scan_tx
715                    .send(ProjectPath {
716                        worktree_id: snapshot.id(),
717                        path: entry.path.clone(),
718                    })
719                    .await?;
720            } else {
721                self.confirm_contents_will_match_tx
722                    .send(MatchingEntry {
723                        should_scan_tx: should_scan_tx,
724                        worktree_root: snapshot.abs_path().clone(),
725                        path: ProjectPath {
726                            worktree_id: snapshot.id(),
727                            path: entry.path.clone(),
728                        },
729                    })
730                    .await?;
731            }
732
733            anyhow::Ok(())
734        })
735        .await;
736    }
737}
738
739struct InputPath {
740    entry: Entry,
741    snapshot: Snapshot,
742    should_scan_tx: oneshot::Sender<ProjectPath>,
743}
744
745struct MatchingEntry {
746    worktree_root: Arc<Path>,
747    path: ProjectPath,
748    should_scan_tx: oneshot::Sender<ProjectPath>,
749}