1use std::{
2 io::{BufRead, BufReader},
3 path::Path,
4 pin::pin,
5 sync::{
6 Arc,
7 atomic::{AtomicUsize, Ordering},
8 },
9};
10
11use anyhow::Context;
12use collections::HashSet;
13use fs::Fs;
14use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
15use gpui::{App, AppContext, AsyncApp, Entity, Task};
16use language::{Buffer, BufferSnapshot};
17use parking_lot::Mutex;
18use postage::oneshot;
19use rpc::{AnyProtoClient, proto};
20use smol::{
21 channel::{Receiver, Sender, bounded, unbounded},
22 future::FutureExt,
23};
24
25use text::BufferId;
26use util::{ResultExt, maybe, paths::compare_rel_paths};
27use worktree::{Entry, ProjectEntryId, Snapshot, Worktree};
28
29use crate::{
30 Project, ProjectItem, ProjectPath, RemotelyCreatedModels,
31 buffer_store::BufferStore,
32 search::{SearchQuery, SearchResult},
33 worktree_store::WorktreeStore,
34};
35
36pub struct Search {
37 buffer_store: Entity<BufferStore>,
38 worktree_store: Entity<WorktreeStore>,
39 limit: usize,
40 kind: SearchKind,
41}
42
43/// Represents search setup, before it is actually kicked off with Search::into_results
44enum SearchKind {
45 /// Search for candidates by inspecting file contents on file system, avoiding loading the buffer unless we know that a given file contains a match.
46 Local {
47 fs: Arc<dyn Fs>,
48 worktrees: Vec<Entity<Worktree>>,
49 },
50 /// Query remote host for candidates. As of writing, the host runs a local search in "buffers with matches only" mode.
51 Remote {
52 client: AnyProtoClient,
53 remote_id: u64,
54 models: Arc<Mutex<RemotelyCreatedModels>>,
55 },
56 /// Run search against a known set of candidates. Even when working with a remote host, this won't round-trip to host.
57 OpenBuffersOnly,
58}
59
60/// Represents results of project search and allows one to either obtain match positions OR
61/// just the handles to buffers that may match the search. Grabbing the handles is cheaper than obtaining full match positions, because in that case we'll look for
62/// at most one match in each file.
63#[must_use]
64pub struct SearchResultsHandle {
65 results: Receiver<SearchResult>,
66 matching_buffers: Receiver<Entity<Buffer>>,
67 trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
68}
69
70impl SearchResultsHandle {
71 pub fn results(self, cx: &mut App) -> Receiver<SearchResult> {
72 (self.trigger_search)(cx).detach();
73 self.results
74 }
75 pub fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
76 (self.trigger_search)(cx).detach();
77 self.matching_buffers
78 }
79}
80
81#[derive(Clone)]
82enum FindSearchCandidates {
83 Local {
84 fs: Arc<dyn Fs>,
85 /// Start off with all paths in project and filter them based on:
86 /// - Include filters
87 /// - Exclude filters
88 /// - Only open buffers
89 /// - Scan ignored files
90 /// Put another way: filter out files that can't match (without looking at file contents)
91 input_paths_rx: Receiver<InputPath>,
92 /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
93 /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
94 confirm_contents_will_match_tx: Sender<MatchingEntry>,
95 confirm_contents_will_match_rx: Receiver<MatchingEntry>,
96 /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
97 /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
98 get_buffer_for_full_scan_tx: Sender<ProjectPath>,
99 },
100 Remote,
101 OpenBuffersOnly,
102}
103
104impl Search {
105 pub fn local(
106 fs: Arc<dyn Fs>,
107 buffer_store: Entity<BufferStore>,
108 worktree_store: Entity<WorktreeStore>,
109 limit: usize,
110 cx: &mut App,
111 ) -> Self {
112 let worktrees = worktree_store.read(cx).visible_worktrees(cx).collect();
113 Self {
114 kind: SearchKind::Local { fs, worktrees },
115 buffer_store,
116 worktree_store,
117 limit,
118 }
119 }
120
121 pub(crate) fn remote(
122 buffer_store: Entity<BufferStore>,
123 worktree_store: Entity<WorktreeStore>,
124 limit: usize,
125 client_state: (AnyProtoClient, u64, Arc<Mutex<RemotelyCreatedModels>>),
126 ) -> Self {
127 Self {
128 kind: SearchKind::Remote {
129 client: client_state.0,
130 remote_id: client_state.1,
131 models: client_state.2,
132 },
133 buffer_store,
134 worktree_store,
135 limit,
136 }
137 }
138 pub(crate) fn open_buffers_only(
139 buffer_store: Entity<BufferStore>,
140 worktree_store: Entity<WorktreeStore>,
141 limit: usize,
142 ) -> Self {
143 Self {
144 kind: SearchKind::OpenBuffersOnly,
145 buffer_store,
146 worktree_store,
147 limit,
148 }
149 }
150
151 pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000;
152 pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
153 /// Prepares a project search run. The resulting [`SearchResultsHandle`] has to be used to specify whether you're interested in matching buffers
154 /// or full search results.
155 pub fn into_handle(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
156 let mut open_buffers = HashSet::default();
157 let mut unnamed_buffers = Vec::new();
158 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
159 let buffers = self.buffer_store.read(cx);
160 for handle in buffers.buffers() {
161 let buffer = handle.read(cx);
162 if !buffers.is_searchable(&buffer.remote_id()) {
163 continue;
164 } else if let Some(entry_id) = buffer.entry_id(cx) {
165 open_buffers.insert(entry_id);
166 } else {
167 self.limit = self.limit.saturating_sub(1);
168 unnamed_buffers.push(handle)
169 };
170 }
171 let executor = cx.background_executor().clone();
172 let (tx, rx) = unbounded();
173 let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
174 let matching_buffers = grab_buffer_snapshot_rx.clone();
175 let trigger_search = Box::new(move |cx: &mut App| {
176 cx.spawn(async move |cx| {
177 for buffer in unnamed_buffers {
178 _ = grab_buffer_snapshot_tx.send(buffer).await;
179 }
180
181 let (find_all_matches_tx, find_all_matches_rx) =
182 bounded(MAX_CONCURRENT_BUFFER_OPENS);
183
184 let (candidate_searcher, tasks) = match self.kind {
185 SearchKind::OpenBuffersOnly => {
186 let Ok(open_buffers) = cx.update(|cx| self.all_loaded_buffers(&query, cx))
187 else {
188 return;
189 };
190 let fill_requests = cx
191 .background_spawn(async move {
192 for buffer in open_buffers {
193 if let Err(_) = grab_buffer_snapshot_tx.send(buffer).await {
194 return;
195 }
196 }
197 })
198 .boxed_local();
199 (FindSearchCandidates::OpenBuffersOnly, vec![fill_requests])
200 }
201 SearchKind::Local {
202 fs,
203 ref mut worktrees,
204 } => {
205 let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
206 unbounded();
207 let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
208 bounded(64);
209 let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
210
211 let (input_paths_tx, input_paths_rx) = unbounded();
212
213 let tasks = vec![
214 cx.spawn(Self::provide_search_paths(
215 std::mem::take(worktrees),
216 query.include_ignored(),
217 input_paths_tx,
218 sorted_search_results_tx,
219 ))
220 .boxed_local(),
221 Self::open_buffers(
222 &self.buffer_store,
223 get_buffer_for_full_scan_rx,
224 grab_buffer_snapshot_tx,
225 cx.clone(),
226 )
227 .boxed_local(),
228 cx.background_spawn(Self::maintain_sorted_search_results(
229 sorted_search_results_rx,
230 get_buffer_for_full_scan_tx.clone(),
231 self.limit,
232 ))
233 .boxed_local(),
234 ];
235 (
236 FindSearchCandidates::Local {
237 fs,
238 get_buffer_for_full_scan_tx,
239 confirm_contents_will_match_tx,
240 confirm_contents_will_match_rx,
241 input_paths_rx,
242 },
243 tasks,
244 )
245 }
246 SearchKind::Remote {
247 client,
248 remote_id,
249 models,
250 } => {
251 let request = client.request(proto::FindSearchCandidates {
252 project_id: remote_id,
253 query: Some(query.to_proto()),
254 limit: self.limit as _,
255 });
256 let Ok(guard) = cx.update(|cx| {
257 Project::retain_remotely_created_models_impl(
258 &models,
259 &self.buffer_store,
260 &self.worktree_store,
261 cx,
262 )
263 }) else {
264 return;
265 };
266 let buffer_store = self.buffer_store.downgrade();
267 let issue_remote_buffers_request = cx
268 .spawn(async move |cx| {
269 let _ = maybe!(async move {
270 let response = request.await?;
271
272 for buffer_id in response.buffer_ids {
273 let buffer_id = BufferId::new(buffer_id)?;
274 let buffer = buffer_store
275 .update(cx, |buffer_store, cx| {
276 buffer_store.wait_for_remote_buffer(buffer_id, cx)
277 })?
278 .await?;
279 let _ = grab_buffer_snapshot_tx.send(buffer).await;
280 }
281
282 drop(guard);
283 anyhow::Ok(())
284 })
285 .await
286 .log_err();
287 })
288 .boxed_local();
289 (
290 FindSearchCandidates::Remote,
291 vec![issue_remote_buffers_request],
292 )
293 }
294 };
295
296 let matches_count = AtomicUsize::new(0);
297 let matched_buffer_count = AtomicUsize::new(0);
298
299 let worker_pool = executor.scoped(|scope| {
300 let num_cpus = executor.num_cpus();
301
302 assert!(num_cpus > 0);
303 for _ in 0..executor.num_cpus() - 1 {
304 let worker = Worker {
305 query: &query,
306 open_buffers: &open_buffers,
307 matched_buffer_count: &matched_buffer_count,
308 matches_count: &matches_count,
309 candidates: candidate_searcher.clone(),
310 find_all_matches_rx: find_all_matches_rx.clone(),
311 publish_matches: tx.clone(),
312 };
313 scope.spawn(worker.run());
314 }
315 drop(tx);
316 drop(find_all_matches_rx);
317 drop(candidate_searcher);
318 });
319
320 let buffer_snapshots = Self::grab_buffer_snapshots(
321 grab_buffer_snapshot_rx,
322 find_all_matches_tx,
323 cx.clone(),
324 );
325 futures::future::join_all(
326 [worker_pool.boxed_local(), buffer_snapshots.boxed_local()]
327 .into_iter()
328 .chain(tasks),
329 )
330 .await;
331 })
332 });
333
334 SearchResultsHandle {
335 results: rx,
336 matching_buffers,
337 trigger_search,
338 }
339 }
340
341 fn provide_search_paths(
342 worktrees: Vec<Entity<Worktree>>,
343 include_ignored: bool,
344 tx: Sender<InputPath>,
345 results: Sender<oneshot::Receiver<ProjectPath>>,
346 ) -> impl AsyncFnOnce(&mut AsyncApp) {
347 async move |cx| {
348 _ = maybe!(async move {
349 for worktree in worktrees {
350 let (mut snapshot, worktree_settings) = worktree
351 .read_with(cx, |this, _| {
352 Some((this.snapshot(), this.as_local()?.settings()))
353 })?
354 .context("The worktree is not local")?;
355 if include_ignored {
356 // Pre-fetch all of the ignored directories as they're going to be searched.
357 let mut entries_to_refresh = vec![];
358 for entry in snapshot.entries(include_ignored, 0) {
359 if entry.is_ignored && entry.kind.is_unloaded() {
360 if !worktree_settings.is_path_excluded(&entry.path) {
361 entries_to_refresh.push(entry.path.clone());
362 }
363 }
364 }
365 let barrier = worktree.update(cx, |this, _| {
366 let local = this.as_local_mut()?;
367 let barrier = entries_to_refresh
368 .into_iter()
369 .map(|path| local.add_path_prefix_to_scan(path).into_future())
370 .collect::<Vec<_>>();
371 Some(barrier)
372 })?;
373 if let Some(barriers) = barrier {
374 futures::future::join_all(barriers).await;
375 }
376 snapshot = worktree.read_with(cx, |this, _| this.snapshot())?;
377 }
378 cx.background_executor()
379 .scoped(|scope| {
380 scope.spawn(async {
381 for entry in snapshot.files(include_ignored, 0) {
382 let (should_scan_tx, should_scan_rx) = oneshot::channel();
383 let Ok(_) = tx
384 .send(InputPath {
385 entry: entry.clone(),
386 snapshot: snapshot.clone(),
387 should_scan_tx,
388 })
389 .await
390 else {
391 return;
392 };
393 if results.send(should_scan_rx).await.is_err() {
394 return;
395 };
396 }
397 })
398 })
399 .await;
400 }
401 anyhow::Ok(())
402 })
403 .await;
404 }
405 }
406
407 async fn maintain_sorted_search_results(
408 rx: Receiver<oneshot::Receiver<ProjectPath>>,
409 paths_for_full_scan: Sender<ProjectPath>,
410 limit: usize,
411 ) {
412 let mut rx = pin!(rx);
413 let mut matched = 0;
414 while let Some(mut next_path_result) = rx.next().await {
415 let Some(successful_path) = next_path_result.next().await else {
416 // This math did not produce a match, hence skip it.
417 continue;
418 };
419 if paths_for_full_scan.send(successful_path).await.is_err() {
420 return;
421 };
422 matched += 1;
423 if matched >= limit {
424 break;
425 }
426 }
427 }
428
429 /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
430 async fn open_buffers(
431 buffer_store: &Entity<BufferStore>,
432 rx: Receiver<ProjectPath>,
433 find_all_matches_tx: Sender<Entity<Buffer>>,
434 mut cx: AsyncApp,
435 ) {
436 let mut rx = pin!(rx.ready_chunks(64));
437 _ = maybe!(async move {
438 while let Some(requested_paths) = rx.next().await {
439 let mut buffers = buffer_store.update(&mut cx, |this, cx| {
440 requested_paths
441 .into_iter()
442 .map(|path| this.open_buffer(path, cx))
443 .collect::<FuturesOrdered<_>>()
444 })?;
445
446 while let Some(buffer) = buffers.next().await {
447 if let Some(buffer) = buffer.log_err() {
448 find_all_matches_tx.send(buffer).await?;
449 }
450 }
451 }
452 Result::<_, anyhow::Error>::Ok(())
453 })
454 .await;
455 }
456
457 async fn grab_buffer_snapshots(
458 rx: Receiver<Entity<Buffer>>,
459 find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
460 mut cx: AsyncApp,
461 ) {
462 _ = maybe!(async move {
463 while let Ok(buffer) = rx.recv().await {
464 let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
465 find_all_matches_tx.send((buffer, snapshot)).await?;
466 }
467 Result::<_, anyhow::Error>::Ok(())
468 })
469 .await;
470 }
471
472 fn all_loaded_buffers(&self, search_query: &SearchQuery, cx: &App) -> Vec<Entity<Buffer>> {
473 let worktree_store = self.worktree_store.read(cx);
474 let mut buffers = search_query
475 .buffers()
476 .into_iter()
477 .flatten()
478 .filter(|buffer| {
479 let b = buffer.read(cx);
480 if let Some(file) = b.file() {
481 if !search_query.match_path(file.path().as_std_path()) {
482 return false;
483 }
484 if !search_query.include_ignored()
485 && let Some(entry) = b
486 .entry_id(cx)
487 .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx))
488 && entry.is_ignored
489 {
490 return false;
491 }
492 }
493 true
494 })
495 .cloned()
496 .collect::<Vec<_>>();
497 buffers.sort_by(|a, b| {
498 let a = a.read(cx);
499 let b = b.read(cx);
500 match (a.file(), b.file()) {
501 (None, None) => a.remote_id().cmp(&b.remote_id()),
502 (None, Some(_)) => std::cmp::Ordering::Less,
503 (Some(_), None) => std::cmp::Ordering::Greater,
504 (Some(a), Some(b)) => compare_rel_paths((a.path(), true), (b.path(), true)),
505 }
506 });
507
508 buffers
509 }
510}
511
512struct Worker<'search> {
513 query: &'search SearchQuery,
514 matched_buffer_count: &'search AtomicUsize,
515 matches_count: &'search AtomicUsize,
516 open_buffers: &'search HashSet<ProjectEntryId>,
517 candidates: FindSearchCandidates,
518 /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
519 find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
520 /// Cool, we have results; let's share them with the world.
521 publish_matches: Sender<SearchResult>,
522}
523
524impl Worker<'_> {
525 async fn run(mut self) {
526 let (
527 input_paths_rx,
528 confirm_contents_will_match_rx,
529 mut confirm_contents_will_match_tx,
530 mut get_buffer_for_full_scan_tx,
531 fs,
532 ) = match self.candidates {
533 FindSearchCandidates::Local {
534 fs,
535 input_paths_rx,
536 confirm_contents_will_match_rx,
537 confirm_contents_will_match_tx,
538 get_buffer_for_full_scan_tx,
539 } => (
540 input_paths_rx,
541 confirm_contents_will_match_rx,
542 confirm_contents_will_match_tx,
543 get_buffer_for_full_scan_tx,
544 Some(fs),
545 ),
546 FindSearchCandidates::Remote | FindSearchCandidates::OpenBuffersOnly => (
547 unbounded().1,
548 unbounded().1,
549 unbounded().0,
550 unbounded().0,
551 None,
552 ),
553 };
554 let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
555 let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse());
556 let mut scan_path = pin!(input_paths_rx.fuse());
557
558 loop {
559 let handler = RequestHandler {
560 query: self.query,
561 open_entries: &self.open_buffers,
562 fs: fs.as_deref(),
563 matched_buffer_count: self.matched_buffer_count,
564 matches_count: self.matches_count,
565 confirm_contents_will_match_tx: &confirm_contents_will_match_tx,
566 get_buffer_for_full_scan_tx: &get_buffer_for_full_scan_tx,
567 publish_matches: &self.publish_matches,
568 };
569 // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
570 // steps straight away. Another worker might be about to produce a value that will
571 // be pushed there, thus we'll replace current worker's pipe with a dummy one.
572 // That way, we'll only ever close a next-stage channel when ALL workers do so.
573 select_biased! {
574 find_all_matches = find_all_matches.next() => {
575
576 if self.publish_matches.is_closed() {
577 break;
578 }
579 let Some(matches) = find_all_matches else {
580 self.publish_matches = bounded(1).0;
581 continue;
582 };
583 let result = handler.handle_find_all_matches(matches).await;
584 if let Some(_should_bail) = result {
585
586 self.publish_matches = bounded(1).0;
587 continue;
588 }
589 },
590 find_first_match = find_first_match.next() => {
591 if let Some(buffer_with_at_least_one_match) = find_first_match {
592 handler.handle_find_first_match(buffer_with_at_least_one_match).await;
593 } else {
594 get_buffer_for_full_scan_tx = bounded(1).0;
595 }
596
597 },
598 scan_path = scan_path.next() => {
599 if let Some(path_to_scan) = scan_path {
600 handler.handle_scan_path(path_to_scan).await;
601 } else {
602 // If we're the last worker to notice that this is not producing values, close the upstream.
603 confirm_contents_will_match_tx = bounded(1).0;
604 }
605
606 }
607 complete => {
608 break
609 },
610
611 }
612 }
613 }
614}
615
616struct RequestHandler<'worker> {
617 query: &'worker SearchQuery,
618 fs: Option<&'worker dyn Fs>,
619 open_entries: &'worker HashSet<ProjectEntryId>,
620 matched_buffer_count: &'worker AtomicUsize,
621 matches_count: &'worker AtomicUsize,
622
623 confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
624 get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
625 publish_matches: &'worker Sender<SearchResult>,
626}
627
628struct LimitReached;
629
630impl RequestHandler<'_> {
631 async fn handle_find_all_matches(
632 &self,
633 (buffer, snapshot): (Entity<Buffer>, BufferSnapshot),
634 ) -> Option<LimitReached> {
635 let ranges = self
636 .query
637 .search(&snapshot, None)
638 .await
639 .iter()
640 .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
641 .collect::<Vec<_>>();
642
643 let matched_ranges = ranges.len();
644 if self.matched_buffer_count.fetch_add(1, Ordering::Release)
645 > Search::MAX_SEARCH_RESULT_FILES
646 || self
647 .matches_count
648 .fetch_add(matched_ranges, Ordering::Release)
649 > Search::MAX_SEARCH_RESULT_RANGES
650 {
651 _ = self.publish_matches.send(SearchResult::LimitReached).await;
652 Some(LimitReached)
653 } else {
654 _ = self
655 .publish_matches
656 .send(SearchResult::Buffer { buffer, ranges })
657 .await;
658 None
659 }
660 }
661 async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
662 _=maybe!(async move {
663 let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
664 let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else {
665 return anyhow::Ok(());
666 };
667
668 let mut file = BufReader::new(file);
669 let file_start = file.fill_buf()?;
670
671 if let Err(Some(starting_position)) =
672 std::str::from_utf8(file_start).map_err(|e| e.error_len())
673 {
674 // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
675 // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
676 log::debug!(
677 "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
678 );
679 return Ok(());
680 }
681
682 if self.query.detect(file).unwrap_or(false) {
683 // Yes, we should scan the whole file.
684 entry.should_scan_tx.send(entry.path).await?;
685 }
686 Ok(())
687 }).await;
688 }
689
690 async fn handle_scan_path(&self, req: InputPath) {
691 _ = maybe!(async move {
692 let InputPath {
693 entry,
694
695 snapshot,
696 should_scan_tx,
697 } = req;
698
699 if entry.is_fifo || !entry.is_file() {
700 return Ok(());
701 }
702
703 if self.query.filters_path() {
704 let matched_path = if self.query.match_full_paths() {
705 let mut full_path = snapshot.root_name().as_std_path().to_owned();
706 full_path.push(entry.path.as_std_path());
707 self.query.match_path(&full_path)
708 } else {
709 self.query.match_path(entry.path.as_std_path())
710 };
711 if !matched_path {
712 return Ok(());
713 }
714 }
715
716 if self.open_entries.contains(&entry.id) {
717 // The buffer is already in memory and that's the version we want to scan;
718 // hence skip the dilly-dally and look for all matches straight away.
719 self.get_buffer_for_full_scan_tx
720 .send(ProjectPath {
721 worktree_id: snapshot.id(),
722 path: entry.path.clone(),
723 })
724 .await?;
725 } else {
726 self.confirm_contents_will_match_tx
727 .send(MatchingEntry {
728 should_scan_tx: should_scan_tx,
729 worktree_root: snapshot.abs_path().clone(),
730 path: ProjectPath {
731 worktree_id: snapshot.id(),
732 path: entry.path.clone(),
733 },
734 })
735 .await?;
736 }
737
738 anyhow::Ok(())
739 })
740 .await;
741 }
742}
743
744struct InputPath {
745 entry: Entry,
746 snapshot: Snapshot,
747 should_scan_tx: oneshot::Sender<ProjectPath>,
748}
749
750struct MatchingEntry {
751 worktree_root: Arc<Path>,
752 path: ProjectPath,
753 should_scan_tx: oneshot::Sender<ProjectPath>,
754}