project_search.rs

  1use std::{
  2    io::{BufRead, BufReader},
  3    path::Path,
  4    pin::pin,
  5    sync::{
  6        Arc,
  7        atomic::{AtomicUsize, Ordering},
  8    },
  9};
 10
 11use anyhow::Context;
 12use collections::HashSet;
 13use fs::Fs;
 14use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
 15use gpui::{App, AppContext, AsyncApp, Entity, Task};
 16use language::{Buffer, BufferSnapshot};
 17use parking_lot::Mutex;
 18use postage::oneshot;
 19use rpc::{AnyProtoClient, proto};
 20use smol::{
 21    channel::{Receiver, Sender, bounded, unbounded},
 22    future::FutureExt,
 23};
 24
 25use text::BufferId;
 26use util::{ResultExt, maybe};
 27use worktree::{Entry, ProjectEntryId, Snapshot, Worktree};
 28
 29use crate::{
 30    Project, ProjectItem, ProjectPath, RemotelyCreatedModels,
 31    buffer_store::BufferStore,
 32    search::{SearchQuery, SearchResult},
 33    worktree_store::WorktreeStore,
 34};
 35
 36pub(crate) struct Search {
 37    pub(crate) fs: Arc<dyn Fs>,
 38    pub(crate) buffer_store: Entity<BufferStore>,
 39    pub(crate) worktree_store: Entity<WorktreeStore>,
 40    pub(crate) worktrees: Vec<Entity<Worktree>>,
 41    pub(crate) limit: usize,
 42    pub(crate) client: Option<(AnyProtoClient, u64)>,
 43    pub(crate) remotely_created_models: Arc<Mutex<RemotelyCreatedModels>>,
 44}
 45
 46/// Represents results of project search and allows one to either obtain match positions OR
 47/// just the handles to buffers that may match the search.
 48#[must_use]
 49pub(crate) struct SearchResultsHandle {
 50    results: Receiver<SearchResult>,
 51    matching_buffers: Receiver<Entity<Buffer>>,
 52    trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
 53}
 54
 55impl SearchResultsHandle {
 56    pub(crate) fn results(self, cx: &mut App) -> Receiver<SearchResult> {
 57        (self.trigger_search)(cx).detach();
 58        self.results
 59    }
 60    pub(crate) fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
 61        (self.trigger_search)(cx).detach();
 62        self.matching_buffers
 63    }
 64}
 65
 66#[derive(Clone)]
 67enum FindSearchCandidates {
 68    Local {
 69        /// Start off with all paths in project and filter them based on:
 70        /// - Include filters
 71        /// - Exclude filters
 72        /// - Only open buffers
 73        /// - Scan ignored files
 74        /// Put another way: filter out files that can't match (without looking at file contents)
 75        input_paths_rx: Receiver<InputPath>,
 76        /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
 77        /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
 78        confirm_contents_will_match_tx: Sender<MatchingEntry>,
 79        confirm_contents_will_match_rx: Receiver<MatchingEntry>,
 80        /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
 81        /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
 82        get_buffer_for_full_scan_tx: Sender<ProjectPath>,
 83    },
 84    Remote,
 85}
 86
 87impl Search {
 88    pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000;
 89    pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
 90    /// Prepares a project search run. The result has to be used to specify whether you're interested in matching buffers
 91    /// or full search results.
 92    pub(crate) fn into_results(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
 93        let mut open_buffers = HashSet::default();
 94        let mut unnamed_buffers = Vec::new();
 95        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
 96        let buffers = self.buffer_store.read(cx);
 97        for handle in buffers.buffers() {
 98            let buffer = handle.read(cx);
 99            if !buffers.is_searchable(&buffer.remote_id()) {
100                continue;
101            } else if let Some(entry_id) = buffer.entry_id(cx) {
102                open_buffers.insert(entry_id);
103            } else {
104                self.limit -= self.limit.saturating_sub(1);
105                unnamed_buffers.push(handle)
106            };
107        }
108        let executor = cx.background_executor().clone();
109        let (tx, rx) = unbounded();
110        let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
111        let matching_buffers = grab_buffer_snapshot_rx.clone();
112        let trigger_search = Box::new(|cx: &mut App| {
113            cx.spawn(async move |cx| {
114                for buffer in unnamed_buffers {
115                    _ = grab_buffer_snapshot_tx.send(buffer).await;
116                }
117
118                let (find_all_matches_tx, find_all_matches_rx) =
119                    bounded(MAX_CONCURRENT_BUFFER_OPENS);
120
121                let (candidate_searcher, tasks) = if let Some((client, remote_id)) = self.client {
122                    let request = client.request(proto::FindSearchCandidates {
123                        project_id: remote_id,
124                        query: Some(query.to_proto()),
125                        limit: self.limit as _,
126                    });
127                    let Ok(guard) = cx.update(|cx| {
128                        Project::retain_remotely_created_models_impl(
129                            &self.remotely_created_models,
130                            &self.buffer_store,
131                            &self.worktree_store,
132                            cx,
133                        )
134                    }) else {
135                        return;
136                    };
137                    let buffer_store = self.buffer_store.downgrade();
138                    let issue_remote_buffers_request = cx
139                        .spawn(async move |cx| {
140                            let _ = maybe!(async move {
141                                let response = request.await?;
142
143                                for buffer_id in response.buffer_ids {
144                                    let buffer_id = BufferId::new(buffer_id)?;
145                                    let buffer = buffer_store
146                                        .update(cx, |buffer_store, cx| {
147                                            buffer_store.wait_for_remote_buffer(buffer_id, cx)
148                                        })?
149                                        .await?;
150                                    let _ = grab_buffer_snapshot_tx.send(buffer).await;
151                                }
152
153                                drop(guard);
154                                anyhow::Ok(())
155                            })
156                            .await
157                            .log_err();
158                        })
159                        .boxed_local();
160                    (
161                        FindSearchCandidates::Remote,
162                        vec![issue_remote_buffers_request],
163                    )
164                } else {
165                    let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) = unbounded();
166                    let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
167                        bounded(64);
168                    let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
169
170                    let (input_paths_tx, input_paths_rx) = unbounded();
171                    let tasks = vec![
172                        cx.spawn(Self::provide_search_paths(
173                            std::mem::take(&mut self.worktrees),
174                            query.include_ignored(),
175                            input_paths_tx,
176                            sorted_search_results_tx,
177                        ))
178                        .boxed_local(),
179                        self.open_buffers(
180                            get_buffer_for_full_scan_rx,
181                            grab_buffer_snapshot_tx,
182                            cx.clone(),
183                        )
184                        .boxed_local(),
185                        cx.background_spawn(Self::maintain_sorted_search_results(
186                            sorted_search_results_rx,
187                            get_buffer_for_full_scan_tx.clone(),
188                            self.limit,
189                        ))
190                        .boxed_local(),
191                    ];
192                    (
193                        FindSearchCandidates::Local {
194                            get_buffer_for_full_scan_tx,
195                            confirm_contents_will_match_tx,
196                            confirm_contents_will_match_rx,
197                            input_paths_rx,
198                        },
199                        tasks,
200                    )
201                };
202
203                let matches_count = AtomicUsize::new(0);
204                let matched_buffer_count = AtomicUsize::new(0);
205
206                let worker_pool = executor.scoped(|scope| {
207                    let num_cpus = executor.num_cpus();
208
209                    assert!(num_cpus > 0);
210                    for _ in 0..executor.num_cpus() - 1 {
211                        let worker = Worker {
212                            query: &query,
213                            open_buffers: &open_buffers,
214                            matched_buffer_count: &matched_buffer_count,
215                            matches_count: &matches_count,
216                            fs: &*self.fs,
217                            candidates: candidate_searcher.clone(),
218                            find_all_matches_rx: find_all_matches_rx.clone(),
219                            publish_matches: tx.clone(),
220                        };
221                        scope.spawn(worker.run());
222                    }
223                    drop(tx);
224                    drop(find_all_matches_rx);
225                    drop(candidate_searcher);
226                });
227
228                let buffer_snapshots = Self::grab_buffer_snapshots(
229                    grab_buffer_snapshot_rx,
230                    find_all_matches_tx,
231                    cx.clone(),
232                );
233                futures::future::join_all(
234                    [worker_pool.boxed_local(), buffer_snapshots.boxed_local()]
235                        .into_iter()
236                        .chain(tasks),
237                )
238                .await;
239            })
240        });
241
242        SearchResultsHandle {
243            results: rx,
244            matching_buffers,
245            trigger_search,
246        }
247    }
248
249    fn provide_search_paths(
250        worktrees: Vec<Entity<Worktree>>,
251        include_ignored: bool,
252        tx: Sender<InputPath>,
253        results: Sender<oneshot::Receiver<ProjectPath>>,
254    ) -> impl AsyncFnOnce(&mut AsyncApp) {
255        async move |cx| {
256            _ = maybe!(async move {
257                for worktree in worktrees {
258                    let (mut snapshot, worktree_settings) = worktree
259                        .read_with(cx, |this, _| {
260                            Some((this.snapshot(), this.as_local()?.settings()))
261                        })?
262                        .context("The worktree is not local")?;
263                    if include_ignored {
264                        // Pre-fetch all of the ignored directories as they're going to be searched.
265                        let mut entries_to_refresh = vec![];
266                        for entry in snapshot.entries(include_ignored, 0) {
267                            if entry.is_ignored && entry.kind.is_unloaded() {
268                                if !worktree_settings.is_path_excluded(&entry.path) {
269                                    entries_to_refresh.push(entry.path.clone());
270                                }
271                            }
272                        }
273                        let barrier = worktree.update(cx, |this, _| {
274                            let local = this.as_local_mut()?;
275                            let barrier = entries_to_refresh
276                                .into_iter()
277                                .map(|path| local.add_path_prefix_to_scan(path).into_future())
278                                .collect::<Vec<_>>();
279                            Some(barrier)
280                        })?;
281                        if let Some(barriers) = barrier {
282                            futures::future::join_all(barriers).await;
283                        }
284                        snapshot = worktree.read_with(cx, |this, _| this.snapshot())?;
285                    }
286                    cx.background_executor()
287                        .scoped(|scope| {
288                            scope.spawn(async {
289                                for entry in snapshot.files(include_ignored, 0) {
290                                    let (should_scan_tx, should_scan_rx) = oneshot::channel();
291                                    let Ok(_) = tx
292                                        .send(InputPath {
293                                            entry: entry.clone(),
294                                            snapshot: snapshot.clone(),
295                                            should_scan_tx,
296                                        })
297                                        .await
298                                    else {
299                                        return;
300                                    };
301                                    if results.send(should_scan_rx).await.is_err() {
302                                        return;
303                                    };
304                                }
305                            })
306                        })
307                        .await;
308                }
309                anyhow::Ok(())
310            })
311            .await;
312        }
313    }
314
315    async fn maintain_sorted_search_results(
316        rx: Receiver<oneshot::Receiver<ProjectPath>>,
317        paths_for_full_scan: Sender<ProjectPath>,
318        limit: usize,
319    ) {
320        let mut rx = pin!(rx);
321        let mut matched = 0;
322        while let Some(mut next_path_result) = rx.next().await {
323            let Some(successful_path) = next_path_result.next().await else {
324                // This math did not produce a match, hence skip it.
325                continue;
326            };
327            if paths_for_full_scan.send(successful_path).await.is_err() {
328                return;
329            };
330            matched += 1;
331            if matched >= limit {
332                break;
333            }
334        }
335    }
336
337    /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
338    async fn open_buffers(
339        &self,
340        rx: Receiver<ProjectPath>,
341        find_all_matches_tx: Sender<Entity<Buffer>>,
342        mut cx: AsyncApp,
343    ) {
344        let mut rx = pin!(rx.ready_chunks(64));
345        _ = maybe!(async move {
346            while let Some(requested_paths) = rx.next().await {
347                let mut buffers = self.buffer_store.update(&mut cx, |this, cx| {
348                    requested_paths
349                        .into_iter()
350                        .map(|path| this.open_buffer(path, cx))
351                        .collect::<FuturesOrdered<_>>()
352                })?;
353
354                while let Some(buffer) = buffers.next().await {
355                    if let Some(buffer) = buffer.log_err() {
356                        find_all_matches_tx.send(buffer).await?;
357                    }
358                }
359            }
360            Result::<_, anyhow::Error>::Ok(())
361        })
362        .await;
363    }
364
365    async fn grab_buffer_snapshots(
366        rx: Receiver<Entity<Buffer>>,
367        find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
368        mut cx: AsyncApp,
369    ) {
370        _ = maybe!(async move {
371            while let Ok(buffer) = rx.recv().await {
372                let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
373                find_all_matches_tx.send((buffer, snapshot)).await?;
374            }
375            Result::<_, anyhow::Error>::Ok(())
376        })
377        .await;
378    }
379}
380
381struct Worker<'search> {
382    query: &'search SearchQuery,
383    matched_buffer_count: &'search AtomicUsize,
384    matches_count: &'search AtomicUsize,
385    open_buffers: &'search HashSet<ProjectEntryId>,
386    fs: &'search dyn Fs,
387
388    candidates: FindSearchCandidates,
389    /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
390    find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
391    /// Cool, we have results; let's share them with the world.
392    publish_matches: Sender<SearchResult>,
393}
394
395impl Worker<'_> {
396    async fn run(mut self) {
397        let (
398            input_paths_rx,
399            confirm_contents_will_match_rx,
400            mut confirm_contents_will_match_tx,
401            mut get_buffer_for_full_scan_tx,
402        ) = match self.candidates {
403            FindSearchCandidates::Local {
404                input_paths_rx,
405                confirm_contents_will_match_rx,
406                confirm_contents_will_match_tx,
407                get_buffer_for_full_scan_tx,
408            } => (
409                input_paths_rx,
410                confirm_contents_will_match_rx,
411                confirm_contents_will_match_tx,
412                get_buffer_for_full_scan_tx,
413            ),
414            FindSearchCandidates::Remote => {
415                (unbounded().1, unbounded().1, unbounded().0, unbounded().0)
416            }
417        };
418        let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
419        let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse());
420        let mut scan_path = pin!(input_paths_rx.fuse());
421
422        loop {
423            let handler = RequestHandler {
424                query: self.query,
425                open_entries: &self.open_buffers,
426                fs: self.fs,
427                matched_buffer_count: self.matched_buffer_count,
428                matches_count: self.matches_count,
429                confirm_contents_will_match_tx: &confirm_contents_will_match_tx,
430                get_buffer_for_full_scan_tx: &get_buffer_for_full_scan_tx,
431                publish_matches: &self.publish_matches,
432            };
433            // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
434            // steps straight away. Another worker might be about to produce a value that will
435            // be pushed there, thus we'll replace current worker's pipe with a dummy one.
436            // That way, we'll only ever close a next-stage channel when ALL workers do so.
437            select_biased! {
438                find_all_matches = find_all_matches.next() => {
439
440                    if self.publish_matches.is_closed() {
441                        break;
442                    }
443                    let Some(matches) = find_all_matches else {
444                        self.publish_matches = bounded(1).0;
445                        continue;
446                    };
447                    let result = handler.handle_find_all_matches(matches).await;
448                    if let Some(_should_bail) = result {
449
450                        self.publish_matches = bounded(1).0;
451                        continue;
452                    }
453                },
454                find_first_match = find_first_match.next() => {
455                    if let Some(buffer_with_at_least_one_match) = find_first_match {
456                        handler.handle_find_first_match(buffer_with_at_least_one_match).await;
457                    } else {
458                        get_buffer_for_full_scan_tx = bounded(1).0;
459                    }
460
461                },
462                scan_path = scan_path.next() => {
463                    if let Some(path_to_scan) = scan_path {
464                        handler.handle_scan_path(path_to_scan).await;
465                    } else {
466                        // If we're the last worker to notice that this is not producing values, close the upstream.
467                        confirm_contents_will_match_tx = bounded(1).0;
468                    }
469
470                 }
471                 complete => {
472                     break
473                },
474
475            }
476        }
477    }
478}
479
480struct RequestHandler<'worker> {
481    query: &'worker SearchQuery,
482    fs: &'worker dyn Fs,
483    open_entries: &'worker HashSet<ProjectEntryId>,
484    matched_buffer_count: &'worker AtomicUsize,
485    matches_count: &'worker AtomicUsize,
486
487    confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
488    get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
489    publish_matches: &'worker Sender<SearchResult>,
490}
491
492struct LimitReached;
493
494impl RequestHandler<'_> {
495    async fn handle_find_all_matches(
496        &self,
497        (buffer, snapshot): (Entity<Buffer>, BufferSnapshot),
498    ) -> Option<LimitReached> {
499        let ranges = self
500            .query
501            .search(&snapshot, None)
502            .await
503            .iter()
504            .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
505            .collect::<Vec<_>>();
506
507        let matched_ranges = ranges.len();
508        if self.matched_buffer_count.fetch_add(1, Ordering::Release)
509            > Search::MAX_SEARCH_RESULT_FILES
510            || self
511                .matches_count
512                .fetch_add(matched_ranges, Ordering::Release)
513                > Search::MAX_SEARCH_RESULT_RANGES
514        {
515            _ = self.publish_matches.send(SearchResult::LimitReached).await;
516            Some(LimitReached)
517        } else {
518            _ = self
519                .publish_matches
520                .send(SearchResult::Buffer { buffer, ranges })
521                .await;
522            None
523        }
524    }
525    async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
526        _=maybe!(async move {
527            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
528            let Some(file) = self.fs.open_sync(&abs_path).await.log_err() else {
529                return anyhow::Ok(());
530            };
531
532            let mut file = BufReader::new(file);
533            let file_start = file.fill_buf()?;
534
535            if let Err(Some(starting_position)) =
536            std::str::from_utf8(file_start).map_err(|e| e.error_len())
537            {
538                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
539                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
540                log::debug!(
541                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
542                );
543                return Ok(());
544            }
545
546            if self.query.detect(file).unwrap_or(false) {
547                // Yes, we should scan the whole file.
548                entry.should_scan_tx.send(entry.path).await?;
549            }
550            Ok(())
551        }).await;
552    }
553
554    async fn handle_scan_path(&self, req: InputPath) {
555        _ = maybe!(async move {
556            let InputPath {
557                entry,
558
559                snapshot,
560                should_scan_tx,
561            } = req;
562
563            if entry.is_fifo || !entry.is_file() {
564                return Ok(());
565            }
566
567            if self.query.filters_path() {
568                let matched_path = if self.query.match_full_paths() {
569                    let mut full_path = snapshot.root_name().as_std_path().to_owned();
570                    full_path.push(entry.path.as_std_path());
571                    self.query.match_path(&full_path)
572                } else {
573                    self.query.match_path(entry.path.as_std_path())
574                };
575                if !matched_path {
576                    return Ok(());
577                }
578            }
579
580            if self.open_entries.contains(&entry.id) {
581                // The buffer is already in memory and that's the version we want to scan;
582                // hence skip the dilly-dally and look for all matches straight away.
583                self.get_buffer_for_full_scan_tx
584                    .send(ProjectPath {
585                        worktree_id: snapshot.id(),
586                        path: entry.path.clone(),
587                    })
588                    .await?;
589            } else {
590                self.confirm_contents_will_match_tx
591                    .send(MatchingEntry {
592                        should_scan_tx: should_scan_tx,
593                        worktree_root: snapshot.abs_path().clone(),
594                        path: ProjectPath {
595                            worktree_id: snapshot.id(),
596                            path: entry.path.clone(),
597                        },
598                    })
599                    .await?;
600            }
601
602            anyhow::Ok(())
603        })
604        .await;
605    }
606}
607
608struct InputPath {
609    entry: Entry,
610    snapshot: Snapshot,
611    should_scan_tx: oneshot::Sender<ProjectPath>,
612}
613
614struct MatchingEntry {
615    worktree_root: Arc<Path>,
616    path: ProjectPath,
617    should_scan_tx: oneshot::Sender<ProjectPath>,
618}