project_search.rs

  1use std::{
  2    io::{BufRead, BufReader},
  3    ops::Range,
  4    path::Path,
  5    pin::pin,
  6    sync::Arc,
  7};
  8
  9use anyhow::Context;
 10use collections::HashSet;
 11use fs::Fs;
 12use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
 13use gpui::{App, AppContext, AsyncApp, Entity, Task};
 14use language::{Buffer, BufferSnapshot};
 15use parking_lot::Mutex;
 16use postage::oneshot;
 17use rpc::{AnyProtoClient, proto};
 18use smol::{
 19    channel::{Receiver, Sender, bounded, unbounded},
 20    future::FutureExt,
 21};
 22
 23use text::BufferId;
 24use util::{ResultExt, maybe, paths::compare_rel_paths};
 25use worktree::{Entry, ProjectEntryId, Snapshot, Worktree};
 26
 27use crate::{
 28    Project, ProjectItem, ProjectPath, RemotelyCreatedModels,
 29    buffer_store::BufferStore,
 30    search::{SearchQuery, SearchResult},
 31    worktree_store::WorktreeStore,
 32};
 33
 34pub struct Search {
 35    buffer_store: Entity<BufferStore>,
 36    worktree_store: Entity<WorktreeStore>,
 37    limit: usize,
 38    kind: SearchKind,
 39}
 40
 41/// Represents search setup, before it is actually kicked off with Search::into_results
 42enum SearchKind {
 43    /// Search for candidates by inspecting file contents on file system, avoiding loading the buffer unless we know that a given file contains a match.
 44    Local {
 45        fs: Arc<dyn Fs>,
 46        worktrees: Vec<Entity<Worktree>>,
 47    },
 48    /// Query remote host for candidates. As of writing, the host runs a local search in "buffers with matches only" mode.
 49    Remote {
 50        client: AnyProtoClient,
 51        remote_id: u64,
 52        models: Arc<Mutex<RemotelyCreatedModels>>,
 53    },
 54    /// Run search against a known set of candidates. Even when working with a remote host, this won't round-trip to host.
 55    OpenBuffersOnly,
 56}
 57
 58/// Represents results of project search and allows one to either obtain match positions OR
 59/// just the handles to buffers that may match the search. Grabbing the handles is cheaper than obtaining full match positions, because in that case we'll look for
 60/// at most one match in each file.
 61#[must_use]
 62pub struct SearchResultsHandle {
 63    results: Receiver<SearchResult>,
 64    matching_buffers: Receiver<Entity<Buffer>>,
 65    trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
 66}
 67
 68impl SearchResultsHandle {
 69    pub fn results(self, cx: &mut App) -> Receiver<SearchResult> {
 70        (self.trigger_search)(cx).detach();
 71        self.results
 72    }
 73    pub fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
 74        (self.trigger_search)(cx).detach();
 75        self.matching_buffers
 76    }
 77}
 78
 79#[derive(Clone)]
 80enum FindSearchCandidates {
 81    Local {
 82        fs: Arc<dyn Fs>,
 83        /// Start off with all paths in project and filter them based on:
 84        /// - Include filters
 85        /// - Exclude filters
 86        /// - Only open buffers
 87        /// - Scan ignored files
 88        /// Put another way: filter out files that can't match (without looking at file contents)
 89        input_paths_rx: Receiver<InputPath>,
 90        /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
 91        /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
 92        confirm_contents_will_match_tx: Sender<MatchingEntry>,
 93        confirm_contents_will_match_rx: Receiver<MatchingEntry>,
 94        /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
 95        /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
 96        get_buffer_for_full_scan_tx: Sender<ProjectPath>,
 97    },
 98    Remote,
 99    OpenBuffersOnly,
100}
101
102impl Search {
103    pub fn local(
104        fs: Arc<dyn Fs>,
105        buffer_store: Entity<BufferStore>,
106        worktree_store: Entity<WorktreeStore>,
107        limit: usize,
108        cx: &mut App,
109    ) -> Self {
110        let worktrees = worktree_store.read(cx).visible_worktrees(cx).collect();
111        Self {
112            kind: SearchKind::Local { fs, worktrees },
113            buffer_store,
114            worktree_store,
115            limit,
116        }
117    }
118
119    pub(crate) fn remote(
120        buffer_store: Entity<BufferStore>,
121        worktree_store: Entity<WorktreeStore>,
122        limit: usize,
123        client_state: (AnyProtoClient, u64, Arc<Mutex<RemotelyCreatedModels>>),
124    ) -> Self {
125        Self {
126            kind: SearchKind::Remote {
127                client: client_state.0,
128                remote_id: client_state.1,
129                models: client_state.2,
130            },
131            buffer_store,
132            worktree_store,
133            limit,
134        }
135    }
136    pub(crate) fn open_buffers_only(
137        buffer_store: Entity<BufferStore>,
138        worktree_store: Entity<WorktreeStore>,
139        limit: usize,
140    ) -> Self {
141        Self {
142            kind: SearchKind::OpenBuffersOnly,
143            buffer_store,
144            worktree_store,
145            limit,
146        }
147    }
148
149    pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000;
150    pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
151    /// Prepares a project search run. The resulting [`SearchResultsHandle`] has to be used to specify whether you're interested in matching buffers
152    /// or full search results.
153    pub fn into_handle(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
154        let mut open_buffers = HashSet::default();
155        let mut unnamed_buffers = Vec::new();
156        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
157        let buffers = self.buffer_store.read(cx);
158        for handle in buffers.buffers() {
159            let buffer = handle.read(cx);
160            if !buffers.is_searchable(&buffer.remote_id()) {
161                continue;
162            } else if let Some(entry_id) = buffer.entry_id(cx) {
163                open_buffers.insert(entry_id);
164            } else {
165                self.limit = self.limit.saturating_sub(1);
166                unnamed_buffers.push(handle)
167            };
168        }
169        let executor = cx.background_executor().clone();
170        let (tx, rx) = unbounded();
171        let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
172        let matching_buffers = grab_buffer_snapshot_rx.clone();
173        let trigger_search = Box::new(move |cx: &mut App| {
174            cx.spawn(async move |cx| {
175                for buffer in unnamed_buffers {
176                    _ = grab_buffer_snapshot_tx.send(buffer).await;
177                }
178
179                let (find_all_matches_tx, find_all_matches_rx) =
180                    bounded(MAX_CONCURRENT_BUFFER_OPENS);
181
182                let (candidate_searcher, tasks) = match self.kind {
183                    SearchKind::OpenBuffersOnly => {
184                        let Ok(open_buffers) = cx.update(|cx| self.all_loaded_buffers(&query, cx))
185                        else {
186                            return;
187                        };
188                        let fill_requests = cx
189                            .background_spawn(async move {
190                                for buffer in open_buffers {
191                                    if let Err(_) = grab_buffer_snapshot_tx.send(buffer).await {
192                                        return;
193                                    }
194                                }
195                            })
196                            .boxed_local();
197                        (FindSearchCandidates::OpenBuffersOnly, vec![fill_requests])
198                    }
199                    SearchKind::Local {
200                        fs,
201                        ref mut worktrees,
202                    } => {
203                        let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
204                            unbounded();
205                        let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
206                            bounded(64);
207                        let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
208
209                        let (input_paths_tx, input_paths_rx) = unbounded();
210
211                        let tasks = vec![
212                            cx.spawn(Self::provide_search_paths(
213                                std::mem::take(worktrees),
214                                query.include_ignored(),
215                                input_paths_tx,
216                                sorted_search_results_tx,
217                            ))
218                            .boxed_local(),
219                            Self::open_buffers(
220                                &self.buffer_store,
221                                get_buffer_for_full_scan_rx,
222                                grab_buffer_snapshot_tx,
223                                cx.clone(),
224                            )
225                            .boxed_local(),
226                            cx.background_spawn(Self::maintain_sorted_search_results(
227                                sorted_search_results_rx,
228                                get_buffer_for_full_scan_tx.clone(),
229                                self.limit,
230                            ))
231                            .boxed_local(),
232                        ];
233                        (
234                            FindSearchCandidates::Local {
235                                fs,
236                                get_buffer_for_full_scan_tx,
237                                confirm_contents_will_match_tx,
238                                confirm_contents_will_match_rx,
239                                input_paths_rx,
240                            },
241                            tasks,
242                        )
243                    }
244                    SearchKind::Remote {
245                        client,
246                        remote_id,
247                        models,
248                    } => {
249                        let request = client.request(proto::FindSearchCandidates {
250                            project_id: remote_id,
251                            query: Some(query.to_proto()),
252                            limit: self.limit as _,
253                        });
254                        let Ok(guard) = cx.update(|cx| {
255                            Project::retain_remotely_created_models_impl(
256                                &models,
257                                &self.buffer_store,
258                                &self.worktree_store,
259                                cx,
260                            )
261                        }) else {
262                            return;
263                        };
264                        let buffer_store = self.buffer_store.downgrade();
265                        let issue_remote_buffers_request = cx
266                            .spawn(async move |cx| {
267                                let _ = maybe!(async move {
268                                    let response = request.await?;
269                                    log::error!(
270                                        "Received {} match candidates for a project search",
271                                        response.buffer_ids.len()
272                                    );
273                                    for buffer_id in response.buffer_ids {
274                                        let buffer_id = BufferId::new(buffer_id)?;
275                                        let buffer = buffer_store
276                                            .update(cx, |buffer_store, cx| {
277                                                buffer_store.wait_for_remote_buffer(buffer_id, cx)
278                                            })?
279                                            .await?;
280                                        let _ = grab_buffer_snapshot_tx.send(buffer).await;
281                                    }
282
283                                    drop(guard);
284                                    anyhow::Ok(())
285                                })
286                                .await
287                                .log_err();
288                            })
289                            .boxed_local();
290                        (
291                            FindSearchCandidates::Remote,
292                            vec![issue_remote_buffers_request],
293                        )
294                    }
295                };
296
297                let should_find_all_matches = !tx.is_closed();
298
299                let worker_pool = executor.scoped(|scope| {
300                    let num_cpus = executor.num_cpus();
301
302                    assert!(num_cpus > 0);
303                    for _ in 0..executor.num_cpus() - 1 {
304                        let worker = Worker {
305                            query: &query,
306                            open_buffers: &open_buffers,
307                            candidates: candidate_searcher.clone(),
308                            find_all_matches_rx: find_all_matches_rx.clone(),
309                        };
310                        scope.spawn(worker.run());
311                    }
312
313                    drop(find_all_matches_rx);
314                    drop(candidate_searcher);
315                });
316
317                let (sorted_matches_tx, sorted_matches_rx) = unbounded();
318                // The caller of `into_handle` decides whether they're interested in all matches (files that matched + all matching ranges) or
319                // just the files. *They are using the same stream as the guts of the project search do*.
320                // This means that we cannot grab values off of that stream unless it's strictly needed for making a progress in project search.
321                //
322                // Grabbing buffer snapshots is only necessary when we're looking for all matches. If the caller decided that they're not interested
323                // in all matches, running that task unconditionally would hinder caller's ability to observe all matching file paths.
324                let buffer_snapshots = if should_find_all_matches {
325                    Some(
326                        Self::grab_buffer_snapshots(
327                            grab_buffer_snapshot_rx,
328                            find_all_matches_tx,
329                            sorted_matches_tx,
330                            cx.clone(),
331                        )
332                        .boxed_local(),
333                    )
334                } else {
335                    drop(find_all_matches_tx);
336
337                    None
338                };
339                let ensure_matches_are_reported_in_order = if should_find_all_matches {
340                    Some(
341                        Self::ensure_matched_ranges_are_reported_in_order(sorted_matches_rx, tx)
342                            .boxed_local(),
343                    )
344                } else {
345                    drop(tx);
346                    None
347                };
348
349                futures::future::join_all(
350                    [worker_pool.boxed_local()]
351                        .into_iter()
352                        .chain(buffer_snapshots)
353                        .chain(ensure_matches_are_reported_in_order)
354                        .chain(tasks),
355                )
356                .await;
357            })
358        });
359
360        SearchResultsHandle {
361            results: rx,
362            matching_buffers,
363            trigger_search,
364        }
365    }
366
367    fn provide_search_paths(
368        worktrees: Vec<Entity<Worktree>>,
369        include_ignored: bool,
370        tx: Sender<InputPath>,
371        results: Sender<oneshot::Receiver<ProjectPath>>,
372    ) -> impl AsyncFnOnce(&mut AsyncApp) {
373        async move |cx| {
374            _ = maybe!(async move {
375                for worktree in worktrees {
376                    let (mut snapshot, worktree_settings) = worktree
377                        .read_with(cx, |this, _| {
378                            Some((this.snapshot(), this.as_local()?.settings()))
379                        })?
380                        .context("The worktree is not local")?;
381                    if include_ignored {
382                        // Pre-fetch all of the ignored directories as they're going to be searched.
383                        let mut entries_to_refresh = vec![];
384                        for entry in snapshot.entries(include_ignored, 0) {
385                            if entry.is_ignored && entry.kind.is_unloaded() {
386                                if !worktree_settings.is_path_excluded(&entry.path) {
387                                    entries_to_refresh.push(entry.path.clone());
388                                }
389                            }
390                        }
391                        let barrier = worktree.update(cx, |this, _| {
392                            let local = this.as_local_mut()?;
393                            let barrier = entries_to_refresh
394                                .into_iter()
395                                .map(|path| local.add_path_prefix_to_scan(path).into_future())
396                                .collect::<Vec<_>>();
397                            Some(barrier)
398                        })?;
399                        if let Some(barriers) = barrier {
400                            futures::future::join_all(barriers).await;
401                        }
402                        snapshot = worktree.read_with(cx, |this, _| this.snapshot())?;
403                    }
404                    cx.background_executor()
405                        .scoped(|scope| {
406                            scope.spawn(async {
407                                for entry in snapshot.files(include_ignored, 0) {
408                                    let (should_scan_tx, should_scan_rx) = oneshot::channel();
409                                    let Ok(_) = tx
410                                        .send(InputPath {
411                                            entry: entry.clone(),
412                                            snapshot: snapshot.clone(),
413                                            should_scan_tx,
414                                        })
415                                        .await
416                                    else {
417                                        return;
418                                    };
419                                    if results.send(should_scan_rx).await.is_err() {
420                                        return;
421                                    };
422                                }
423                            })
424                        })
425                        .await;
426                }
427                anyhow::Ok(())
428            })
429            .await;
430        }
431    }
432
433    async fn maintain_sorted_search_results(
434        rx: Receiver<oneshot::Receiver<ProjectPath>>,
435        paths_for_full_scan: Sender<ProjectPath>,
436        limit: usize,
437    ) {
438        let mut rx = pin!(rx);
439        let mut matched = 0;
440        while let Some(mut next_path_result) = rx.next().await {
441            let Some(successful_path) = next_path_result.next().await else {
442                // This file did not produce a match, hence skip it.
443                continue;
444            };
445            if paths_for_full_scan.send(successful_path).await.is_err() {
446                return;
447            };
448            matched += 1;
449            if matched >= limit {
450                break;
451            }
452        }
453    }
454
455    /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
456    async fn open_buffers(
457        buffer_store: &Entity<BufferStore>,
458        rx: Receiver<ProjectPath>,
459        find_all_matches_tx: Sender<Entity<Buffer>>,
460        mut cx: AsyncApp,
461    ) {
462        let mut rx = pin!(rx.ready_chunks(64));
463        _ = maybe!(async move {
464            while let Some(requested_paths) = rx.next().await {
465                let mut buffers = buffer_store.update(&mut cx, |this, cx| {
466                    requested_paths
467                        .into_iter()
468                        .map(|path| this.open_buffer(path, cx))
469                        .collect::<FuturesOrdered<_>>()
470                })?;
471
472                while let Some(buffer) = buffers.next().await {
473                    if let Some(buffer) = buffer.log_err() {
474                        find_all_matches_tx.send(buffer).await?;
475                    }
476                }
477            }
478            Result::<_, anyhow::Error>::Ok(())
479        })
480        .await;
481    }
482
483    async fn grab_buffer_snapshots(
484        rx: Receiver<Entity<Buffer>>,
485        find_all_matches_tx: Sender<(
486            Entity<Buffer>,
487            BufferSnapshot,
488            oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
489        )>,
490        results: Sender<oneshot::Receiver<(Entity<Buffer>, Vec<Range<language::Anchor>>)>>,
491        mut cx: AsyncApp,
492    ) {
493        _ = maybe!(async move {
494            while let Ok(buffer) = rx.recv().await {
495                let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
496                let (tx, rx) = oneshot::channel();
497                find_all_matches_tx.send((buffer, snapshot, tx)).await?;
498                results.send(rx).await?;
499            }
500            debug_assert!(rx.is_empty());
501            Result::<_, anyhow::Error>::Ok(())
502        })
503        .await;
504    }
505
506    async fn ensure_matched_ranges_are_reported_in_order(
507        rx: Receiver<oneshot::Receiver<(Entity<Buffer>, Vec<Range<language::Anchor>>)>>,
508        tx: Sender<SearchResult>,
509    ) {
510        use postage::stream::Stream;
511        _ = maybe!(async move {
512            let mut matched_buffers = 0;
513            let mut matches = 0;
514            while let Ok(mut next_buffer_matches) = rx.recv().await {
515                let Some((buffer, ranges)) = next_buffer_matches.recv().await else {
516                    continue;
517                };
518
519                if matched_buffers > Search::MAX_SEARCH_RESULT_FILES
520                    || matches > Search::MAX_SEARCH_RESULT_RANGES
521                {
522                    _ = tx.send(SearchResult::LimitReached).await;
523                    break;
524                }
525                matched_buffers += 1;
526                matches += ranges.len();
527
528                _ = tx.send(SearchResult::Buffer { buffer, ranges }).await?;
529            }
530            anyhow::Ok(())
531        })
532        .await;
533    }
534
535    fn all_loaded_buffers(&self, search_query: &SearchQuery, cx: &App) -> Vec<Entity<Buffer>> {
536        let worktree_store = self.worktree_store.read(cx);
537        let mut buffers = search_query
538            .buffers()
539            .into_iter()
540            .flatten()
541            .filter(|buffer| {
542                let b = buffer.read(cx);
543                if let Some(file) = b.file() {
544                    if !search_query.match_path(file.path().as_std_path()) {
545                        return false;
546                    }
547                    if !search_query.include_ignored()
548                        && let Some(entry) = b
549                            .entry_id(cx)
550                            .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx))
551                        && entry.is_ignored
552                    {
553                        return false;
554                    }
555                }
556                true
557            })
558            .cloned()
559            .collect::<Vec<_>>();
560        buffers.sort_by(|a, b| {
561            let a = a.read(cx);
562            let b = b.read(cx);
563            match (a.file(), b.file()) {
564                (None, None) => a.remote_id().cmp(&b.remote_id()),
565                (None, Some(_)) => std::cmp::Ordering::Less,
566                (Some(_), None) => std::cmp::Ordering::Greater,
567                (Some(a), Some(b)) => compare_rel_paths((a.path(), true), (b.path(), true)),
568            }
569        });
570
571        buffers
572    }
573}
574
575struct Worker<'search> {
576    query: &'search SearchQuery,
577    open_buffers: &'search HashSet<ProjectEntryId>,
578    candidates: FindSearchCandidates,
579    /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
580    /// Then, when you're done, share them via the channel you were given.
581    find_all_matches_rx: Receiver<(
582        Entity<Buffer>,
583        BufferSnapshot,
584        oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
585    )>,
586}
587
588impl Worker<'_> {
589    async fn run(self) {
590        let (
591            input_paths_rx,
592            confirm_contents_will_match_rx,
593            mut confirm_contents_will_match_tx,
594            mut get_buffer_for_full_scan_tx,
595            fs,
596        ) = match self.candidates {
597            FindSearchCandidates::Local {
598                fs,
599                input_paths_rx,
600                confirm_contents_will_match_rx,
601                confirm_contents_will_match_tx,
602                get_buffer_for_full_scan_tx,
603            } => (
604                input_paths_rx,
605                confirm_contents_will_match_rx,
606                confirm_contents_will_match_tx,
607                get_buffer_for_full_scan_tx,
608                Some(fs),
609            ),
610            FindSearchCandidates::Remote | FindSearchCandidates::OpenBuffersOnly => (
611                unbounded().1,
612                unbounded().1,
613                unbounded().0,
614                unbounded().0,
615                None,
616            ),
617        };
618        // WorkerA: grabs a request for "find all matches in file/a" <- takes 5 minutes
619        // right after: WorkerB: grabs a request for "find all matches in file/b" <- takes 5 seconds
620        let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
621        let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse());
622        let mut scan_path = pin!(input_paths_rx.fuse());
623
624        loop {
625            let handler = RequestHandler {
626                query: self.query,
627                open_entries: &self.open_buffers,
628                fs: fs.as_deref(),
629                confirm_contents_will_match_tx: &confirm_contents_will_match_tx,
630                get_buffer_for_full_scan_tx: &get_buffer_for_full_scan_tx,
631            };
632            // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
633            // steps straight away. Another worker might be about to produce a value that will
634            // be pushed there, thus we'll replace current worker's pipe with a dummy one.
635            // That way, we'll only ever close a next-stage channel when ALL workers do so.
636            select_biased! {
637                find_all_matches = find_all_matches.next() => {
638                    let Some(matches) = find_all_matches else {
639                        continue;
640                    };
641                    handler.handle_find_all_matches(matches).await;
642                },
643                find_first_match = find_first_match.next() => {
644                    if let Some(buffer_with_at_least_one_match) = find_first_match {
645                        handler.handle_find_first_match(buffer_with_at_least_one_match).await;
646                    } else {
647                        get_buffer_for_full_scan_tx = bounded(1).0;
648                    }
649
650                },
651                scan_path = scan_path.next() => {
652                    if let Some(path_to_scan) = scan_path {
653                        handler.handle_scan_path(path_to_scan).await;
654                    } else {
655                        // If we're the last worker to notice that this is not producing values, close the upstream.
656                        confirm_contents_will_match_tx = bounded(1).0;
657                    }
658
659                 }
660                 complete => {
661                     break
662                },
663
664            }
665        }
666    }
667}
668
669struct RequestHandler<'worker> {
670    query: &'worker SearchQuery,
671    fs: Option<&'worker dyn Fs>,
672    open_entries: &'worker HashSet<ProjectEntryId>,
673    confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
674    get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
675}
676
677impl RequestHandler<'_> {
678    async fn handle_find_all_matches(
679        &self,
680        (buffer, snapshot, mut report_matches): (
681            Entity<Buffer>,
682            BufferSnapshot,
683            oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
684        ),
685    ) {
686        let ranges = self
687            .query
688            .search(&snapshot, None)
689            .await
690            .iter()
691            .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
692            .collect::<Vec<_>>();
693
694        _ = report_matches.send((buffer, ranges)).await;
695    }
696
697    async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
698        _=maybe!(async move {
699            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
700            let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else {
701                return anyhow::Ok(());
702            };
703
704            let mut file = BufReader::new(file);
705            let file_start = file.fill_buf()?;
706
707            if let Err(Some(starting_position)) =
708            std::str::from_utf8(file_start).map_err(|e| e.error_len())
709            {
710                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
711                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
712                log::debug!(
713                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
714                );
715                return Ok(());
716            }
717
718            if self.query.detect(file).unwrap_or(false) {
719                // Yes, we should scan the whole file.
720                entry.should_scan_tx.send(entry.path).await?;
721            }
722            Ok(())
723        }).await;
724    }
725
726    async fn handle_scan_path(&self, req: InputPath) {
727        _ = maybe!(async move {
728            let InputPath {
729                entry,
730
731                snapshot,
732                should_scan_tx,
733            } = req;
734
735            if entry.is_fifo || !entry.is_file() {
736                return Ok(());
737            }
738
739            if self.query.filters_path() {
740                let matched_path = if self.query.match_full_paths() {
741                    let mut full_path = snapshot.root_name().as_std_path().to_owned();
742                    full_path.push(entry.path.as_std_path());
743                    self.query.match_path(&full_path)
744                } else {
745                    self.query.match_path(entry.path.as_std_path())
746                };
747                if !matched_path {
748                    return Ok(());
749                }
750            }
751
752            if self.open_entries.contains(&entry.id) {
753                // The buffer is already in memory and that's the version we want to scan;
754                // hence skip the dilly-dally and look for all matches straight away.
755                self.get_buffer_for_full_scan_tx
756                    .send(ProjectPath {
757                        worktree_id: snapshot.id(),
758                        path: entry.path.clone(),
759                    })
760                    .await?;
761            } else {
762                self.confirm_contents_will_match_tx
763                    .send(MatchingEntry {
764                        should_scan_tx: should_scan_tx,
765                        worktree_root: snapshot.abs_path().clone(),
766                        path: ProjectPath {
767                            worktree_id: snapshot.id(),
768                            path: entry.path.clone(),
769                        },
770                    })
771                    .await?;
772            }
773
774            anyhow::Ok(())
775        })
776        .await;
777    }
778}
779
780struct InputPath {
781    entry: Entry,
782    snapshot: Snapshot,
783    should_scan_tx: oneshot::Sender<ProjectPath>,
784}
785
786struct MatchingEntry {
787    worktree_root: Arc<Path>,
788    path: ProjectPath,
789    should_scan_tx: oneshot::Sender<ProjectPath>,
790}