project_search.rs

  1use std::{
  2    ops::Range,
  3    pin::{Pin, pin},
  4};
  5
  6use futures::{FutureExt, StreamExt, future::BoxFuture, select_biased};
  7use gpui::{App, AsyncApp, Entity, WeakEntity};
  8use language::{Buffer, BufferSnapshot};
  9use smol::channel::{Receiver, Sender, bounded, unbounded};
 10use text::Anchor;
 11use util::{ResultExt, maybe};
 12use worktree::{Entry, Snapshot, WorktreeSettings};
 13
 14use crate::{
 15    ProjectPath,
 16    buffer_store::BufferStore,
 17    search::{SearchQuery, SearchResult},
 18};
 19
 20pub(crate) struct ProjectSearcher {
 21    buffer_store: WeakEntity<BufferStore>,
 22    pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>,
 23}
 24
 25impl ProjectSearcher {
 26    pub(crate) fn search(self, query: SearchQuery, cx: &mut App) -> Receiver<SearchResult> {
 27        let executor = cx.background_executor().clone();
 28        let (tx, rx) = unbounded();
 29        cx.spawn(async move |cx| {
 30            const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
 31            let (find_all_matches_tx, find_all_matches_rx) = bounded(MAX_CONCURRENT_BUFFER_OPENS);
 32            let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
 33                bounded(MAX_CONCURRENT_BUFFER_OPENS);
 34            let worker_pool = executor.scoped(|scope| {
 35                let (input_paths_tx, input_paths_rx) = bounded(64);
 36                let (find_first_match_tx, find_first_match_rx) = bounded(64);
 37
 38                for _ in 0..executor.num_cpus() {
 39                    let worker = Worker {
 40                        query: &query,
 41                        input_paths_rx: input_paths_rx.clone(),
 42                        find_first_match_rx: find_first_match_rx.clone(),
 43                        find_first_match_tx: find_first_match_tx.clone(),
 44                        get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(),
 45                        find_all_matches_rx: find_all_matches_rx.clone(),
 46                        publish_matches: todo!(),
 47                    };
 48                    scope.spawn(worker.run());
 49                }
 50                scope.spawn(self.provide_search_paths(&query, input_paths_tx))
 51            });
 52            self.open_buffers(get_buffer_for_full_scan_rx, find_all_matches_tx, cx)
 53                .await;
 54            worker_pool.await;
 55        })
 56        .detach();
 57        rx
 58    }
 59
 60    async fn provide_search_paths<'a>(&'a self, query: &SearchQuery, tx: Sender<&'a Entry>) {
 61        for (snapshot, _) in &self.snapshots {
 62            for entry in snapshot.entries(query.include_ignored(), 0) {
 63                let Ok(_) = tx.send(entry).await else {
 64                    return;
 65                };
 66            }
 67        }
 68    }
 69
 70    /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
 71    async fn open_buffers<'a>(
 72        &'a self,
 73        rx: Receiver<ProjectPath>,
 74        find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
 75        cx: &mut AsyncApp,
 76    ) {
 77        _ = maybe!(async move {
 78            while let Ok(requested_path) = rx.recv().await {
 79                let Some(buffer) = self
 80                    .buffer_store
 81                    .update(cx, |this, cx| this.open_buffer(requested_path, cx))?
 82                    .await
 83                    .log_err()
 84                else {
 85                    continue;
 86                };
 87                let snapshot = buffer.read_with(cx, |this, _| this.snapshot())?;
 88                find_all_matches_tx.send((buffer, snapshot)).await?;
 89            }
 90            Result::<_, anyhow::Error>::Ok(())
 91        })
 92        .await;
 93    }
 94}
 95
 96struct Worker<'search> {
 97    query: &'search SearchQuery,
 98    /// Start off with all paths in project and filter them based on:
 99    /// - Include filters
100    /// - Exclude filters
101    /// - Only open buffers
102    /// - Scan ignored files
103    /// Put another way: filter out files that can't match (without looking at file contents)
104    input_paths_rx: Receiver<&'search Entry>,
105    /// After that, figure out which paths contain at least one match (look at file contents). That's called "partial scan".
106    find_first_match_tx: Sender<()>,
107    find_first_match_rx: Receiver<()>,
108    /// Of those that contain at least one match, look for rest of matches (and figure out their ranges).
109    /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
110    get_buffer_for_full_scan_tx: Sender<ProjectPath>,
111    /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
112    find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
113    /// Cool, we have results; let's share them with the world.
114    publish_matches: Sender<(Entity<Buffer>, Vec<Range<Anchor>>)>,
115}
116
117impl Worker<'_> {
118    async fn run(self) {
119        let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
120        let mut find_first_match = pin!(self.find_first_match_rx.fuse());
121        let mut scan_path = pin!(self.input_paths_rx.fuse());
122        loop {
123            select_biased! {
124                find_all_matches = find_all_matches.next() => {
125
126                },
127                find_first_match = find_first_match.next() => {
128
129                },
130                scan_path = scan_path.next() => {
131
132                 },
133                 complete => break,
134            }
135        }
136    }
137}