1use std::{
2 io::{BufRead, BufReader},
3 ops::Range,
4 path::Path,
5 pin::pin,
6 sync::Arc,
7};
8
9use anyhow::Context;
10use collections::HashSet;
11use fs::Fs;
12use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
13use gpui::{App, AppContext, AsyncApp, Entity, Task};
14use language::{Buffer, BufferSnapshot};
15use parking_lot::Mutex;
16use postage::oneshot;
17use rpc::{AnyProtoClient, proto};
18use smol::{
19 channel::{Receiver, Sender, bounded, unbounded},
20 future::FutureExt,
21};
22
23use text::BufferId;
24use util::{ResultExt, maybe, paths::compare_rel_paths};
25use worktree::{Entry, ProjectEntryId, Snapshot, Worktree};
26
27use crate::{
28 Project, ProjectItem, ProjectPath, RemotelyCreatedModels,
29 buffer_store::BufferStore,
30 search::{SearchQuery, SearchResult},
31 worktree_store::WorktreeStore,
32};
33
34pub struct Search {
35 buffer_store: Entity<BufferStore>,
36 worktree_store: Entity<WorktreeStore>,
37 limit: usize,
38 kind: SearchKind,
39}
40
41/// Represents search setup, before it is actually kicked off with Search::into_results
42enum SearchKind {
43 /// Search for candidates by inspecting file contents on file system, avoiding loading the buffer unless we know that a given file contains a match.
44 Local {
45 fs: Arc<dyn Fs>,
46 worktrees: Vec<Entity<Worktree>>,
47 },
48 /// Query remote host for candidates. As of writing, the host runs a local search in "buffers with matches only" mode.
49 Remote {
50 client: AnyProtoClient,
51 remote_id: u64,
52 models: Arc<Mutex<RemotelyCreatedModels>>,
53 },
54 /// Run search against a known set of candidates. Even when working with a remote host, this won't round-trip to host.
55 OpenBuffersOnly,
56}
57
58/// Represents results of project search and allows one to either obtain match positions OR
59/// just the handles to buffers that may match the search. Grabbing the handles is cheaper than obtaining full match positions, because in that case we'll look for
60/// at most one match in each file.
61#[must_use]
62pub struct SearchResultsHandle {
63 results: Receiver<SearchResult>,
64 matching_buffers: Receiver<Entity<Buffer>>,
65 trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
66}
67
68impl SearchResultsHandle {
69 pub fn results(self, cx: &mut App) -> Receiver<SearchResult> {
70 (self.trigger_search)(cx).detach();
71 self.results
72 }
73 pub fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
74 (self.trigger_search)(cx).detach();
75 self.matching_buffers
76 }
77}
78
79#[derive(Clone)]
80enum FindSearchCandidates {
81 Local {
82 fs: Arc<dyn Fs>,
83 /// Start off with all paths in project and filter them based on:
84 /// - Include filters
85 /// - Exclude filters
86 /// - Only open buffers
87 /// - Scan ignored files
88 /// Put another way: filter out files that can't match (without looking at file contents)
89 input_paths_rx: Receiver<InputPath>,
90 /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
91 /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
92 confirm_contents_will_match_tx: Sender<MatchingEntry>,
93 confirm_contents_will_match_rx: Receiver<MatchingEntry>,
94 /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
95 /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
96 get_buffer_for_full_scan_tx: Sender<ProjectPath>,
97 },
98 Remote,
99 OpenBuffersOnly,
100}
101
102impl Search {
103 pub fn local(
104 fs: Arc<dyn Fs>,
105 buffer_store: Entity<BufferStore>,
106 worktree_store: Entity<WorktreeStore>,
107 limit: usize,
108 cx: &mut App,
109 ) -> Self {
110 let worktrees = worktree_store.read(cx).visible_worktrees(cx).collect();
111 Self {
112 kind: SearchKind::Local { fs, worktrees },
113 buffer_store,
114 worktree_store,
115 limit,
116 }
117 }
118
119 pub(crate) fn remote(
120 buffer_store: Entity<BufferStore>,
121 worktree_store: Entity<WorktreeStore>,
122 limit: usize,
123 client_state: (AnyProtoClient, u64, Arc<Mutex<RemotelyCreatedModels>>),
124 ) -> Self {
125 Self {
126 kind: SearchKind::Remote {
127 client: client_state.0,
128 remote_id: client_state.1,
129 models: client_state.2,
130 },
131 buffer_store,
132 worktree_store,
133 limit,
134 }
135 }
136 pub(crate) fn open_buffers_only(
137 buffer_store: Entity<BufferStore>,
138 worktree_store: Entity<WorktreeStore>,
139 limit: usize,
140 ) -> Self {
141 Self {
142 kind: SearchKind::OpenBuffersOnly,
143 buffer_store,
144 worktree_store,
145 limit,
146 }
147 }
148
149 pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000;
150 pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
151 /// Prepares a project search run. The resulting [`SearchResultsHandle`] has to be used to specify whether you're interested in matching buffers
152 /// or full search results.
153 pub fn into_handle(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
154 let mut open_buffers = HashSet::default();
155 let mut unnamed_buffers = Vec::new();
156 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
157 let buffers = self.buffer_store.read(cx);
158 for handle in buffers.buffers() {
159 let buffer = handle.read(cx);
160 if !buffers.is_searchable(&buffer.remote_id()) {
161 continue;
162 } else if let Some(entry_id) = buffer.entry_id(cx) {
163 open_buffers.insert(entry_id);
164 } else {
165 self.limit = self.limit.saturating_sub(1);
166 unnamed_buffers.push(handle)
167 };
168 }
169 let executor = cx.background_executor().clone();
170 let (tx, rx) = unbounded();
171 let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
172 let matching_buffers = grab_buffer_snapshot_rx.clone();
173 let trigger_search = Box::new(move |cx: &mut App| {
174 cx.spawn(async move |cx| {
175 for buffer in unnamed_buffers {
176 _ = grab_buffer_snapshot_tx.send(buffer).await;
177 }
178
179 let (find_all_matches_tx, find_all_matches_rx) =
180 bounded(MAX_CONCURRENT_BUFFER_OPENS);
181
182 let (candidate_searcher, tasks) = match self.kind {
183 SearchKind::OpenBuffersOnly => {
184 let Ok(open_buffers) = cx.update(|cx| self.all_loaded_buffers(&query, cx))
185 else {
186 return;
187 };
188 let fill_requests = cx
189 .background_spawn(async move {
190 for buffer in open_buffers {
191 if let Err(_) = grab_buffer_snapshot_tx.send(buffer).await {
192 return;
193 }
194 }
195 })
196 .boxed_local();
197 (FindSearchCandidates::OpenBuffersOnly, vec![fill_requests])
198 }
199 SearchKind::Local {
200 fs,
201 ref mut worktrees,
202 } => {
203 let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
204 unbounded();
205 let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
206 bounded(64);
207 let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
208
209 let (input_paths_tx, input_paths_rx) = unbounded();
210
211 let tasks = vec![
212 cx.spawn(Self::provide_search_paths(
213 std::mem::take(worktrees),
214 query.include_ignored(),
215 input_paths_tx,
216 sorted_search_results_tx,
217 ))
218 .boxed_local(),
219 Self::open_buffers(
220 &self.buffer_store,
221 get_buffer_for_full_scan_rx,
222 grab_buffer_snapshot_tx,
223 cx.clone(),
224 )
225 .boxed_local(),
226 cx.background_spawn(Self::maintain_sorted_search_results(
227 sorted_search_results_rx,
228 get_buffer_for_full_scan_tx.clone(),
229 self.limit,
230 ))
231 .boxed_local(),
232 ];
233 (
234 FindSearchCandidates::Local {
235 fs,
236 get_buffer_for_full_scan_tx,
237 confirm_contents_will_match_tx,
238 confirm_contents_will_match_rx,
239 input_paths_rx,
240 },
241 tasks,
242 )
243 }
244 SearchKind::Remote {
245 client,
246 remote_id,
247 models,
248 } => {
249 let request = client.request(proto::FindSearchCandidates {
250 project_id: remote_id,
251 query: Some(query.to_proto()),
252 limit: self.limit as _,
253 });
254 let Ok(guard) = cx.update(|cx| {
255 Project::retain_remotely_created_models_impl(
256 &models,
257 &self.buffer_store,
258 &self.worktree_store,
259 cx,
260 )
261 }) else {
262 return;
263 };
264 let buffer_store = self.buffer_store.downgrade();
265 let issue_remote_buffers_request = cx
266 .spawn(async move |cx| {
267 let _ = maybe!(async move {
268 let response = request.await?;
269 log::error!(
270 "Received {} match candidates for a project search",
271 response.buffer_ids.len()
272 );
273 for buffer_id in response.buffer_ids {
274 let buffer_id = BufferId::new(buffer_id)?;
275 let buffer = buffer_store
276 .update(cx, |buffer_store, cx| {
277 buffer_store.wait_for_remote_buffer(buffer_id, cx)
278 })?
279 .await?;
280 let _ = grab_buffer_snapshot_tx.send(buffer).await;
281 }
282
283 drop(guard);
284 anyhow::Ok(())
285 })
286 .await
287 .log_err();
288 })
289 .boxed_local();
290 (
291 FindSearchCandidates::Remote,
292 vec![issue_remote_buffers_request],
293 )
294 }
295 };
296
297 let should_find_all_matches = !tx.is_closed();
298
299 let worker_pool = executor.scoped(|scope| {
300 let num_cpus = executor.num_cpus();
301
302 assert!(num_cpus > 0);
303 for _ in 0..executor.num_cpus() - 1 {
304 let worker = Worker {
305 query: &query,
306 open_buffers: &open_buffers,
307 candidates: candidate_searcher.clone(),
308 find_all_matches_rx: find_all_matches_rx.clone(),
309 };
310 scope.spawn(worker.run());
311 }
312
313 drop(find_all_matches_rx);
314 drop(candidate_searcher);
315 });
316
317 let (sorted_matches_tx, sorted_matches_rx) = unbounded();
318 // The caller of `into_handle` decides whether they're interested in all matches (files that matched + all matching ranges) or
319 // just the files. *They are using the same stream as the guts of the project search do*.
320 // This means that we cannot grab values off of that stream unless it's strictly needed for making a progress in project search.
321 //
322 // Grabbing buffer snapshots is only necessary when we're looking for all matches. If the caller decided that they're not interested
323 // in all matches, running that task unconditionally would hinder caller's ability to observe all matching file paths.
324 let buffer_snapshots = if should_find_all_matches {
325 Some(
326 Self::grab_buffer_snapshots(
327 grab_buffer_snapshot_rx,
328 find_all_matches_tx,
329 sorted_matches_tx,
330 cx.clone(),
331 )
332 .boxed_local(),
333 )
334 } else {
335 drop(find_all_matches_tx);
336
337 None
338 };
339 let ensure_matches_are_reported_in_order = if should_find_all_matches {
340 Some(
341 Self::ensure_matched_ranges_are_reported_in_order(sorted_matches_rx, tx)
342 .boxed_local(),
343 )
344 } else {
345 drop(tx);
346 None
347 };
348
349 futures::future::join_all(
350 [worker_pool.boxed_local()]
351 .into_iter()
352 .chain(buffer_snapshots)
353 .chain(ensure_matches_are_reported_in_order)
354 .chain(tasks),
355 )
356 .await;
357 })
358 });
359
360 SearchResultsHandle {
361 results: rx,
362 matching_buffers,
363 trigger_search,
364 }
365 }
366
367 fn provide_search_paths(
368 worktrees: Vec<Entity<Worktree>>,
369 include_ignored: bool,
370 tx: Sender<InputPath>,
371 results: Sender<oneshot::Receiver<ProjectPath>>,
372 ) -> impl AsyncFnOnce(&mut AsyncApp) {
373 async move |cx| {
374 _ = maybe!(async move {
375 for worktree in worktrees {
376 let (mut snapshot, worktree_settings) = worktree
377 .read_with(cx, |this, _| {
378 Some((this.snapshot(), this.as_local()?.settings()))
379 })?
380 .context("The worktree is not local")?;
381 if include_ignored {
382 // Pre-fetch all of the ignored directories as they're going to be searched.
383 let mut entries_to_refresh = vec![];
384 for entry in snapshot.entries(include_ignored, 0) {
385 if entry.is_ignored && entry.kind.is_unloaded() {
386 if !worktree_settings.is_path_excluded(&entry.path) {
387 entries_to_refresh.push(entry.path.clone());
388 }
389 }
390 }
391 let barrier = worktree.update(cx, |this, _| {
392 let local = this.as_local_mut()?;
393 let barrier = entries_to_refresh
394 .into_iter()
395 .map(|path| local.add_path_prefix_to_scan(path).into_future())
396 .collect::<Vec<_>>();
397 Some(barrier)
398 })?;
399 if let Some(barriers) = barrier {
400 futures::future::join_all(barriers).await;
401 }
402 snapshot = worktree.read_with(cx, |this, _| this.snapshot())?;
403 }
404 cx.background_executor()
405 .scoped(|scope| {
406 scope.spawn(async {
407 for entry in snapshot.files(include_ignored, 0) {
408 let (should_scan_tx, should_scan_rx) = oneshot::channel();
409 let Ok(_) = tx
410 .send(InputPath {
411 entry: entry.clone(),
412 snapshot: snapshot.clone(),
413 should_scan_tx,
414 })
415 .await
416 else {
417 return;
418 };
419 if results.send(should_scan_rx).await.is_err() {
420 return;
421 };
422 }
423 })
424 })
425 .await;
426 }
427 anyhow::Ok(())
428 })
429 .await;
430 }
431 }
432
433 async fn maintain_sorted_search_results(
434 rx: Receiver<oneshot::Receiver<ProjectPath>>,
435 paths_for_full_scan: Sender<ProjectPath>,
436 limit: usize,
437 ) {
438 let mut rx = pin!(rx);
439 let mut matched = 0;
440 while let Some(mut next_path_result) = rx.next().await {
441 let Some(successful_path) = next_path_result.next().await else {
442 // This file did not produce a match, hence skip it.
443 continue;
444 };
445 if paths_for_full_scan.send(successful_path).await.is_err() {
446 return;
447 };
448 matched += 1;
449 if matched >= limit {
450 break;
451 }
452 }
453 }
454
455 /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
456 async fn open_buffers(
457 buffer_store: &Entity<BufferStore>,
458 rx: Receiver<ProjectPath>,
459 find_all_matches_tx: Sender<Entity<Buffer>>,
460 mut cx: AsyncApp,
461 ) {
462 let mut rx = pin!(rx.ready_chunks(64));
463 _ = maybe!(async move {
464 while let Some(requested_paths) = rx.next().await {
465 let mut buffers = buffer_store.update(&mut cx, |this, cx| {
466 requested_paths
467 .into_iter()
468 .map(|path| this.open_buffer(path, cx))
469 .collect::<FuturesOrdered<_>>()
470 })?;
471
472 while let Some(buffer) = buffers.next().await {
473 if let Some(buffer) = buffer.log_err() {
474 find_all_matches_tx.send(buffer).await?;
475 }
476 }
477 }
478 Result::<_, anyhow::Error>::Ok(())
479 })
480 .await;
481 }
482
483 async fn grab_buffer_snapshots(
484 rx: Receiver<Entity<Buffer>>,
485 find_all_matches_tx: Sender<(
486 Entity<Buffer>,
487 BufferSnapshot,
488 oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
489 )>,
490 results: Sender<oneshot::Receiver<(Entity<Buffer>, Vec<Range<language::Anchor>>)>>,
491 mut cx: AsyncApp,
492 ) {
493 _ = maybe!(async move {
494 while let Ok(buffer) = rx.recv().await {
495 let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
496 let (tx, rx) = oneshot::channel();
497 find_all_matches_tx.send((buffer, snapshot, tx)).await?;
498 results.send(rx).await?;
499 }
500 debug_assert!(rx.is_empty());
501 Result::<_, anyhow::Error>::Ok(())
502 })
503 .await;
504 }
505
506 async fn ensure_matched_ranges_are_reported_in_order(
507 rx: Receiver<oneshot::Receiver<(Entity<Buffer>, Vec<Range<language::Anchor>>)>>,
508 tx: Sender<SearchResult>,
509 ) {
510 use postage::stream::Stream;
511 _ = maybe!(async move {
512 let mut matched_buffers = 0;
513 let mut matches = 0;
514 while let Ok(mut next_buffer_matches) = rx.recv().await {
515 let Some((buffer, ranges)) = next_buffer_matches.recv().await else {
516 continue;
517 };
518
519 if matched_buffers > Search::MAX_SEARCH_RESULT_FILES
520 || matches > Search::MAX_SEARCH_RESULT_RANGES
521 {
522 _ = tx.send(SearchResult::LimitReached).await;
523 break;
524 }
525 matched_buffers += 1;
526 matches += ranges.len();
527
528 _ = tx.send(SearchResult::Buffer { buffer, ranges }).await?;
529 }
530 anyhow::Ok(())
531 })
532 .await;
533 }
534
535 fn all_loaded_buffers(&self, search_query: &SearchQuery, cx: &App) -> Vec<Entity<Buffer>> {
536 let worktree_store = self.worktree_store.read(cx);
537 let mut buffers = search_query
538 .buffers()
539 .into_iter()
540 .flatten()
541 .filter(|buffer| {
542 let b = buffer.read(cx);
543 if let Some(file) = b.file() {
544 if !search_query.match_path(file.path().as_std_path()) {
545 return false;
546 }
547 if !search_query.include_ignored()
548 && let Some(entry) = b
549 .entry_id(cx)
550 .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx))
551 && entry.is_ignored
552 {
553 return false;
554 }
555 }
556 true
557 })
558 .cloned()
559 .collect::<Vec<_>>();
560 buffers.sort_by(|a, b| {
561 let a = a.read(cx);
562 let b = b.read(cx);
563 match (a.file(), b.file()) {
564 (None, None) => a.remote_id().cmp(&b.remote_id()),
565 (None, Some(_)) => std::cmp::Ordering::Less,
566 (Some(_), None) => std::cmp::Ordering::Greater,
567 (Some(a), Some(b)) => compare_rel_paths((a.path(), true), (b.path(), true)),
568 }
569 });
570
571 buffers
572 }
573}
574
575struct Worker<'search> {
576 query: &'search SearchQuery,
577 open_buffers: &'search HashSet<ProjectEntryId>,
578 candidates: FindSearchCandidates,
579 /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
580 /// Then, when you're done, share them via the channel you were given.
581 find_all_matches_rx: Receiver<(
582 Entity<Buffer>,
583 BufferSnapshot,
584 oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
585 )>,
586}
587
588impl Worker<'_> {
589 async fn run(self) {
590 let (
591 input_paths_rx,
592 confirm_contents_will_match_rx,
593 mut confirm_contents_will_match_tx,
594 mut get_buffer_for_full_scan_tx,
595 fs,
596 ) = match self.candidates {
597 FindSearchCandidates::Local {
598 fs,
599 input_paths_rx,
600 confirm_contents_will_match_rx,
601 confirm_contents_will_match_tx,
602 get_buffer_for_full_scan_tx,
603 } => (
604 input_paths_rx,
605 confirm_contents_will_match_rx,
606 confirm_contents_will_match_tx,
607 get_buffer_for_full_scan_tx,
608 Some(fs),
609 ),
610 FindSearchCandidates::Remote | FindSearchCandidates::OpenBuffersOnly => (
611 unbounded().1,
612 unbounded().1,
613 unbounded().0,
614 unbounded().0,
615 None,
616 ),
617 };
618 // WorkerA: grabs a request for "find all matches in file/a" <- takes 5 minutes
619 // right after: WorkerB: grabs a request for "find all matches in file/b" <- takes 5 seconds
620 let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
621 let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse());
622 let mut scan_path = pin!(input_paths_rx.fuse());
623
624 loop {
625 let handler = RequestHandler {
626 query: self.query,
627 open_entries: &self.open_buffers,
628 fs: fs.as_deref(),
629 confirm_contents_will_match_tx: &confirm_contents_will_match_tx,
630 get_buffer_for_full_scan_tx: &get_buffer_for_full_scan_tx,
631 };
632 // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
633 // steps straight away. Another worker might be about to produce a value that will
634 // be pushed there, thus we'll replace current worker's pipe with a dummy one.
635 // That way, we'll only ever close a next-stage channel when ALL workers do so.
636 select_biased! {
637 find_all_matches = find_all_matches.next() => {
638 let Some(matches) = find_all_matches else {
639 continue;
640 };
641 handler.handle_find_all_matches(matches).await;
642 },
643 find_first_match = find_first_match.next() => {
644 if let Some(buffer_with_at_least_one_match) = find_first_match {
645 handler.handle_find_first_match(buffer_with_at_least_one_match).await;
646 } else {
647 get_buffer_for_full_scan_tx = bounded(1).0;
648 }
649
650 },
651 scan_path = scan_path.next() => {
652 if let Some(path_to_scan) = scan_path {
653 handler.handle_scan_path(path_to_scan).await;
654 } else {
655 // If we're the last worker to notice that this is not producing values, close the upstream.
656 confirm_contents_will_match_tx = bounded(1).0;
657 }
658
659 }
660 complete => {
661 break
662 },
663
664 }
665 }
666 }
667}
668
669struct RequestHandler<'worker> {
670 query: &'worker SearchQuery,
671 fs: Option<&'worker dyn Fs>,
672 open_entries: &'worker HashSet<ProjectEntryId>,
673 confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
674 get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
675}
676
677impl RequestHandler<'_> {
678 async fn handle_find_all_matches(
679 &self,
680 (buffer, snapshot, mut report_matches): (
681 Entity<Buffer>,
682 BufferSnapshot,
683 oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
684 ),
685 ) {
686 let ranges = self
687 .query
688 .search(&snapshot, None)
689 .await
690 .iter()
691 .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
692 .collect::<Vec<_>>();
693
694 _ = report_matches.send((buffer, ranges)).await;
695 }
696
697 async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
698 _=maybe!(async move {
699 let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
700 let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else {
701 return anyhow::Ok(());
702 };
703
704 let mut file = BufReader::new(file);
705 let file_start = file.fill_buf()?;
706
707 if let Err(Some(starting_position)) =
708 std::str::from_utf8(file_start).map_err(|e| e.error_len())
709 {
710 // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
711 // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
712 log::debug!(
713 "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
714 );
715 return Ok(());
716 }
717
718 if self.query.detect(file).unwrap_or(false) {
719 // Yes, we should scan the whole file.
720 entry.should_scan_tx.send(entry.path).await?;
721 }
722 Ok(())
723 }).await;
724 }
725
726 async fn handle_scan_path(&self, req: InputPath) {
727 _ = maybe!(async move {
728 let InputPath {
729 entry,
730
731 snapshot,
732 should_scan_tx,
733 } = req;
734
735 if entry.is_fifo || !entry.is_file() {
736 return Ok(());
737 }
738
739 if self.query.filters_path() {
740 let matched_path = if self.query.match_full_paths() {
741 let mut full_path = snapshot.root_name().as_std_path().to_owned();
742 full_path.push(entry.path.as_std_path());
743 self.query.match_path(&full_path)
744 } else {
745 self.query.match_path(entry.path.as_std_path())
746 };
747 if !matched_path {
748 return Ok(());
749 }
750 }
751
752 if self.open_entries.contains(&entry.id) {
753 // The buffer is already in memory and that's the version we want to scan;
754 // hence skip the dilly-dally and look for all matches straight away.
755 self.get_buffer_for_full_scan_tx
756 .send(ProjectPath {
757 worktree_id: snapshot.id(),
758 path: entry.path.clone(),
759 })
760 .await?;
761 } else {
762 self.confirm_contents_will_match_tx
763 .send(MatchingEntry {
764 should_scan_tx: should_scan_tx,
765 worktree_root: snapshot.abs_path().clone(),
766 path: ProjectPath {
767 worktree_id: snapshot.id(),
768 path: entry.path.clone(),
769 },
770 })
771 .await?;
772 }
773
774 anyhow::Ok(())
775 })
776 .await;
777 }
778}
779
780struct InputPath {
781 entry: Entry,
782 snapshot: Snapshot,
783 should_scan_tx: oneshot::Sender<ProjectPath>,
784}
785
786struct MatchingEntry {
787 worktree_root: Arc<Path>,
788 path: ProjectPath,
789 should_scan_tx: oneshot::Sender<ProjectPath>,
790}