project_search.rs

  1use std::{
  2    io::{BufRead, BufReader},
  3    path::Path,
  4    pin::pin,
  5    sync::{
  6        Arc,
  7        atomic::{AtomicUsize, Ordering},
  8    },
  9};
 10
 11use anyhow::Context;
 12use collections::HashSet;
 13use fs::Fs;
 14use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
 15use gpui::{App, AsyncApp, Entity, Task};
 16use language::{Buffer, BufferSnapshot};
 17use postage::oneshot;
 18use smol::channel::{Receiver, Sender, bounded, unbounded};
 19
 20use util::{ResultExt, maybe};
 21use worktree::{Entry, ProjectEntryId, Snapshot, Worktree};
 22
 23use crate::{
 24    ProjectItem, ProjectPath,
 25    buffer_store::BufferStore,
 26    search::{SearchQuery, SearchResult},
 27};
 28
 29pub(crate) struct Search {
 30    pub(crate) fs: Arc<dyn Fs>,
 31    pub(crate) buffer_store: Entity<BufferStore>,
 32    pub(crate) worktrees: Vec<Entity<Worktree>>,
 33    pub(crate) limit: usize,
 34}
 35
 36/// Represents results of project search and allows one to either obtain match positions OR
 37/// just the handles to buffers that may match the search.
 38#[must_use]
 39pub(crate) struct SearchResultsHandle {
 40    results: Receiver<SearchResult>,
 41    matching_buffers: Receiver<Entity<Buffer>>,
 42    trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
 43}
 44
 45impl SearchResultsHandle {
 46    pub(crate) fn results(self, cx: &mut App) -> Receiver<SearchResult> {
 47        (self.trigger_search)(cx).detach();
 48        self.results
 49    }
 50    pub(crate) fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
 51        (self.trigger_search)(cx).detach();
 52        self.matching_buffers
 53    }
 54}
 55
 56impl Search {
 57    pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000;
 58    pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
 59    /// Prepares a project search run. The result has to be used to specify whether you're interested in matching buffers
 60    /// or full search results.
 61    pub(crate) fn into_results(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
 62        let mut open_buffers = HashSet::default();
 63        let mut unnamed_buffers = Vec::new();
 64        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
 65        let buffers = self.buffer_store.read(cx);
 66        for handle in buffers.buffers() {
 67            let buffer = handle.read(cx);
 68            if !buffers.is_searchable(&buffer.remote_id()) {
 69                continue;
 70            } else if let Some(entry_id) = buffer.entry_id(cx) {
 71                open_buffers.insert(entry_id);
 72            } else {
 73                self.limit -= self.limit.saturating_sub(1);
 74                unnamed_buffers.push(handle)
 75            };
 76        }
 77        let executor = cx.background_executor().clone();
 78        let (tx, rx) = unbounded();
 79        let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
 80        let matching_buffers = grab_buffer_snapshot_rx.clone();
 81        let trigger_search = Box::new(|cx: &mut App| {
 82            cx.spawn(async move |cx| {
 83                for buffer in unnamed_buffers {
 84                    _ = grab_buffer_snapshot_tx.send(buffer).await;
 85                }
 86
 87                let (find_all_matches_tx, find_all_matches_rx) =
 88                    bounded(MAX_CONCURRENT_BUFFER_OPENS);
 89
 90                let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) = unbounded();
 91                let matches_count = AtomicUsize::new(0);
 92                let matched_buffer_count = AtomicUsize::new(0);
 93                let (input_paths_tx, input_paths_rx) = unbounded();
 94                let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
 95                let worker_pool = executor.scoped(|scope| {
 96                    let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
 97                        bounded(64);
 98
 99                    let num_cpus = executor.num_cpus();
100
101                    assert!(num_cpus > 0);
102                    for _ in 0..executor.num_cpus() - 1 {
103                        let worker = Worker {
104                            query: &query,
105                            open_buffers: &open_buffers,
106                            matched_buffer_count: &matched_buffer_count,
107                            matches_count: &matches_count,
108                            fs: &*self.fs,
109                            input_paths_rx: input_paths_rx.clone(),
110                            confirm_contents_will_match_rx: confirm_contents_will_match_rx.clone(),
111                            confirm_contents_will_match_tx: confirm_contents_will_match_tx.clone(),
112                            get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(),
113                            find_all_matches_rx: find_all_matches_rx.clone(),
114                            publish_matches: tx.clone(),
115                        };
116                        scope.spawn(worker.run());
117                    }
118                    drop(tx);
119                    drop(find_all_matches_rx);
120
121                    scope.spawn(Self::maintain_sorted_search_results(
122                        sorted_search_results_rx,
123                        get_buffer_for_full_scan_tx,
124                        self.limit,
125                    ))
126                });
127                let provide_search_paths = cx.spawn(Self::provide_search_paths(
128                    std::mem::take(&mut self.worktrees),
129                    query.include_ignored(),
130                    input_paths_tx,
131                    sorted_search_results_tx,
132                ));
133                let open_buffers = self.open_buffers(
134                    get_buffer_for_full_scan_rx,
135                    grab_buffer_snapshot_tx,
136                    cx.clone(),
137                );
138                let buffer_snapshots = self.grab_buffer_snapshots(
139                    grab_buffer_snapshot_rx,
140                    find_all_matches_tx,
141                    cx.clone(),
142                );
143                futures::future::join4(
144                    worker_pool,
145                    buffer_snapshots,
146                    open_buffers,
147                    provide_search_paths,
148                )
149                .await;
150            })
151        });
152        SearchResultsHandle {
153            results: rx,
154            matching_buffers,
155            trigger_search,
156        }
157    }
158
159    fn provide_search_paths(
160        worktrees: Vec<Entity<Worktree>>,
161        include_ignored: bool,
162        tx: Sender<InputPath>,
163        results: Sender<oneshot::Receiver<ProjectPath>>,
164    ) -> impl AsyncFnOnce(&mut AsyncApp) {
165        async move |cx| {
166            _ = maybe!(async move {
167                for worktree in worktrees {
168                    let (mut snapshot, worktree_settings) = worktree
169                        .read_with(cx, |this, _| {
170                            Some((this.snapshot(), this.as_local()?.settings()))
171                        })?
172                        .context("The worktree is not local")?;
173                    if include_ignored {
174                        // Pre-fetch all of the ignored directories as they're going to be searched.
175                        let mut entries_to_refresh = vec![];
176                        for entry in snapshot.entries(include_ignored, 0) {
177                            if entry.is_ignored && entry.kind.is_unloaded() {
178                                if !worktree_settings.is_path_excluded(&entry.path) {
179                                    entries_to_refresh.push(entry.path.clone());
180                                }
181                            }
182                        }
183                        let barrier = worktree.update(cx, |this, _| {
184                            let local = this.as_local_mut()?;
185                            let barrier = entries_to_refresh
186                                .into_iter()
187                                .map(|path| local.add_path_prefix_to_scan(path).into_future())
188                                .collect::<Vec<_>>();
189                            Some(barrier)
190                        })?;
191                        if let Some(barriers) = barrier {
192                            futures::future::join_all(barriers).await;
193                        }
194                        snapshot = worktree.read_with(cx, |this, _| this.snapshot())?;
195                    }
196                    cx.background_executor()
197                        .scoped(|scope| {
198                            scope.spawn(async {
199                                for entry in snapshot.files(include_ignored, 0) {
200                                    let (should_scan_tx, should_scan_rx) = oneshot::channel();
201                                    let Ok(_) = tx
202                                        .send(InputPath {
203                                            entry: entry.clone(),
204                                            snapshot: snapshot.clone(),
205                                            should_scan_tx,
206                                        })
207                                        .await
208                                    else {
209                                        return;
210                                    };
211                                    if results.send(should_scan_rx).await.is_err() {
212                                        return;
213                                    };
214                                }
215                            })
216                        })
217                        .await;
218                }
219                anyhow::Ok(())
220            })
221            .await;
222        }
223    }
224
225    async fn maintain_sorted_search_results(
226        rx: Receiver<oneshot::Receiver<ProjectPath>>,
227        paths_for_full_scan: Sender<ProjectPath>,
228        limit: usize,
229    ) {
230        let mut rx = pin!(rx);
231        let mut matched = 0;
232        while let Some(mut next_path_result) = rx.next().await {
233            let Some(successful_path) = next_path_result.next().await else {
234                // This math did not produce a match, hence skip it.
235                continue;
236            };
237            if paths_for_full_scan.send(successful_path).await.is_err() {
238                return;
239            };
240            matched += 1;
241            if matched >= limit {
242                break;
243            }
244        }
245    }
246
247    /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
248    async fn open_buffers(
249        &self,
250        rx: Receiver<ProjectPath>,
251        find_all_matches_tx: Sender<Entity<Buffer>>,
252        mut cx: AsyncApp,
253    ) {
254        let mut rx = pin!(rx.ready_chunks(64));
255        _ = maybe!(async move {
256            while let Some(requested_paths) = rx.next().await {
257                let mut buffers = self.buffer_store.update(&mut cx, |this, cx| {
258                    requested_paths
259                        .into_iter()
260                        .map(|path| this.open_buffer(path, cx))
261                        .collect::<FuturesOrdered<_>>()
262                })?;
263
264                while let Some(buffer) = buffers.next().await {
265                    if let Some(buffer) = buffer.log_err() {
266                        find_all_matches_tx.send(buffer).await?;
267                    }
268                }
269            }
270            Result::<_, anyhow::Error>::Ok(())
271        })
272        .await;
273    }
274
275    async fn grab_buffer_snapshots(
276        &self,
277        rx: Receiver<Entity<Buffer>>,
278        find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
279        mut cx: AsyncApp,
280    ) {
281        _ = maybe!(async move {
282            while let Ok(buffer) = rx.recv().await {
283                let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
284                find_all_matches_tx.send((buffer, snapshot)).await?;
285            }
286            Result::<_, anyhow::Error>::Ok(())
287        })
288        .await;
289    }
290}
291
292struct Worker<'search> {
293    query: &'search SearchQuery,
294    matched_buffer_count: &'search AtomicUsize,
295    matches_count: &'search AtomicUsize,
296    open_buffers: &'search HashSet<ProjectEntryId>,
297    fs: &'search dyn Fs,
298    /// Start off with all paths in project and filter them based on:
299    /// - Include filters
300    /// - Exclude filters
301    /// - Only open buffers
302    /// - Scan ignored files
303    /// Put another way: filter out files that can't match (without looking at file contents)
304    input_paths_rx: Receiver<InputPath>,
305
306    /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
307    /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
308    confirm_contents_will_match_tx: Sender<MatchingEntry>,
309    confirm_contents_will_match_rx: Receiver<MatchingEntry>,
310    /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
311    /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
312    get_buffer_for_full_scan_tx: Sender<ProjectPath>,
313    /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
314    find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
315    /// Cool, we have results; let's share them with the world.
316    publish_matches: Sender<SearchResult>,
317}
318
319impl Worker<'_> {
320    async fn run(mut self) {
321        let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
322        let mut find_first_match = pin!(self.confirm_contents_will_match_rx.fuse());
323        let mut scan_path = pin!(self.input_paths_rx.fuse());
324
325        loop {
326            let handler = RequestHandler {
327                query: self.query,
328                open_entries: &self.open_buffers,
329                fs: self.fs,
330                matched_buffer_count: self.matched_buffer_count,
331                matches_count: self.matches_count,
332                confirm_contents_will_match_tx: &self.confirm_contents_will_match_tx,
333                get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx,
334                publish_matches: &self.publish_matches,
335            };
336            // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
337            // steps straight away. Another worker might be about to produce a value that will
338            // be pushed there, thus we'll replace current worker's pipe with a dummy one.
339            // That way, we'll only ever close a next-stage channel when ALL workers do so.
340            select_biased! {
341                find_all_matches = find_all_matches.next() => {
342                    if self.publish_matches.is_closed() {
343                        break;
344                    }
345                    let Some(matches) = find_all_matches else {
346                        self.publish_matches = bounded(1).0;
347                        continue;
348                    };
349                    let result = handler.handle_find_all_matches(matches).await;
350                    if let Some(_should_bail) = result {
351
352                        self.publish_matches = bounded(1).0;
353                        continue;
354                    }
355                },
356                find_first_match = find_first_match.next() => {
357                    if let Some(buffer_with_at_least_one_match) = find_first_match {
358                        handler.handle_find_first_match(buffer_with_at_least_one_match).await;
359                    } else {
360                        self.get_buffer_for_full_scan_tx = bounded(1).0;
361                    }
362
363                },
364                scan_path = scan_path.next() => {
365                    if let Some(path_to_scan) = scan_path {
366                        handler.handle_scan_path(path_to_scan).await;
367                    } else {
368                        // If we're the last worker to notice that this is not producing values, close the upstream.
369                        self.confirm_contents_will_match_tx = bounded(1).0;
370                    }
371
372                 }
373                 complete => {
374                     break
375                },
376
377            }
378        }
379    }
380}
381
382struct RequestHandler<'worker> {
383    query: &'worker SearchQuery,
384    fs: &'worker dyn Fs,
385    open_entries: &'worker HashSet<ProjectEntryId>,
386    matched_buffer_count: &'worker AtomicUsize,
387    matches_count: &'worker AtomicUsize,
388
389    confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
390    get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
391    publish_matches: &'worker Sender<SearchResult>,
392}
393
394struct LimitReached;
395
396impl RequestHandler<'_> {
397    async fn handle_find_all_matches(
398        &self,
399        (buffer, snapshot): (Entity<Buffer>, BufferSnapshot),
400    ) -> Option<LimitReached> {
401        let ranges = self
402            .query
403            .search(&snapshot, None)
404            .await
405            .iter()
406            .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
407            .collect::<Vec<_>>();
408
409        let matched_ranges = ranges.len();
410        if self.matched_buffer_count.fetch_add(1, Ordering::Release)
411            > Search::MAX_SEARCH_RESULT_FILES
412            || self
413                .matches_count
414                .fetch_add(matched_ranges, Ordering::Release)
415                > Search::MAX_SEARCH_RESULT_RANGES
416        {
417            _ = self.publish_matches.send(SearchResult::LimitReached).await;
418            Some(LimitReached)
419        } else {
420            _ = self
421                .publish_matches
422                .send(SearchResult::Buffer { buffer, ranges })
423                .await;
424            None
425        }
426    }
427    async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
428        _=maybe!(async move {
429            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
430            let Some(file) = self.fs.open_sync(&abs_path).await.log_err() else {
431                return anyhow::Ok(());
432            };
433
434            let mut file = BufReader::new(file);
435            let file_start = file.fill_buf()?;
436
437            if let Err(Some(starting_position)) =
438            std::str::from_utf8(file_start).map_err(|e| e.error_len())
439            {
440                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
441                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
442                log::debug!(
443                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
444                );
445                return Ok(());
446            }
447
448            if self.query.detect(file).unwrap_or(false) {
449                // Yes, we should scan the whole file.
450                entry.should_scan_tx.send(entry.path).await?;
451            }
452            Ok(())
453        }).await;
454    }
455
456    async fn handle_scan_path(&self, req: InputPath) {
457        _ = maybe!(async move {
458            let InputPath {
459                entry,
460
461                snapshot,
462                should_scan_tx,
463            } = req;
464
465            if entry.is_fifo || !entry.is_file() {
466                return Ok(());
467            }
468
469            if self.query.filters_path() {
470                let matched_path = if self.query.match_full_paths() {
471                    let mut full_path = snapshot.root_name().as_std_path().to_owned();
472                    full_path.push(entry.path.as_std_path());
473                    self.query.match_path(&full_path)
474                } else {
475                    self.query.match_path(entry.path.as_std_path())
476                };
477                if !matched_path {
478                    return Ok(());
479                }
480            }
481
482            if self.open_entries.contains(&entry.id) {
483                // The buffer is already in memory and that's the version we want to scan;
484                // hence skip the dilly-dally and look for all matches straight away.
485                self.get_buffer_for_full_scan_tx
486                    .send(ProjectPath {
487                        worktree_id: snapshot.id(),
488                        path: entry.path.clone(),
489                    })
490                    .await?;
491            } else {
492                self.confirm_contents_will_match_tx
493                    .send(MatchingEntry {
494                        should_scan_tx: should_scan_tx,
495                        worktree_root: snapshot.abs_path().clone(),
496                        path: ProjectPath {
497                            worktree_id: snapshot.id(),
498                            path: entry.path.clone(),
499                        },
500                    })
501                    .await?;
502            }
503
504            anyhow::Ok(())
505        })
506        .await;
507    }
508}
509
510struct InputPath {
511    entry: Entry,
512    snapshot: Snapshot,
513    should_scan_tx: oneshot::Sender<ProjectPath>,
514}
515
516struct MatchingEntry {
517    worktree_root: Arc<Path>,
518    path: ProjectPath,
519    should_scan_tx: oneshot::Sender<ProjectPath>,
520}