project_search.rs

  1use std::{
  2    ops::{ControlFlow, Range},
  3    path::Path,
  4    pin::{Pin, pin},
  5    sync::{
  6        Arc,
  7        atomic::{AtomicUsize, Ordering},
  8    },
  9};
 10
 11use futures::{FutureExt, StreamExt, future::BoxFuture, select_biased};
 12use gpui::{App, AsyncApp, Entity, WeakEntity};
 13use language::{Buffer, BufferSnapshot};
 14use postage::oneshot;
 15use smol::channel::{Receiver, Sender, bounded, unbounded};
 16use text::Anchor;
 17use util::{ResultExt, maybe};
 18use worktree::{Entry, Snapshot, WorktreeSettings};
 19
 20use crate::{
 21    ProjectPath,
 22    buffer_store::BufferStore,
 23    search::{SearchQuery, SearchResult},
 24};
 25
 26pub(crate) struct ProjectSearcher {
 27    buffer_store: WeakEntity<BufferStore>,
 28    pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>,
 29}
 30
 31const MAX_SEARCH_RESULT_FILES: usize = 5_000;
 32const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
 33
 34impl ProjectSearcher {
 35    pub(crate) fn search(self, query: SearchQuery, cx: &mut App) -> Receiver<SearchResult> {
 36        let executor = cx.background_executor().clone();
 37        let (tx, rx) = unbounded();
 38        cx.spawn(async move |cx| {
 39            const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
 40            let (find_all_matches_tx, find_all_matches_rx) = bounded(MAX_CONCURRENT_BUFFER_OPENS);
 41            let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
 42                bounded(MAX_CONCURRENT_BUFFER_OPENS);
 43            let matches_count = AtomicUsize::new(0);
 44            let matched_buffer_count = AtomicUsize::new(0);
 45            let worker_pool = executor.scoped(|scope| {
 46                let (input_paths_tx, input_paths_rx) = bounded(64);
 47                let (find_first_match_tx, find_first_match_rx) = bounded(64);
 48
 49                for _ in 0..executor.num_cpus() {
 50                    let worker = Worker {
 51                        query: &query,
 52                        matched_buffer_count: &matched_buffer_count,
 53                        matches_count: &matches_count,
 54                        input_paths_rx: input_paths_rx.clone(),
 55                        find_first_match_rx: find_first_match_rx.clone(),
 56                        find_first_match_tx: find_first_match_tx.clone(),
 57                        get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(),
 58                        find_all_matches_rx: find_all_matches_rx.clone(),
 59                        publish_matches: tx.clone(),
 60                    };
 61                    scope.spawn(worker.run());
 62                }
 63                scope.spawn(self.provide_search_paths(&query, input_paths_tx))
 64            });
 65            self.open_buffers(get_buffer_for_full_scan_rx, find_all_matches_tx, cx)
 66                .await;
 67            worker_pool.await;
 68            let limit_reached = matches_count.load(Ordering::Release) > MAX_SEARCH_RESULT_RANGES
 69                || matched_buffer_count.load(Ordering::Release) > MAX_SEARCH_RESULT_FILES;
 70            if limit_reached {
 71                _ = tx.send(SearchResult::LimitReached).await;
 72            }
 73        })
 74        .detach();
 75        rx
 76    }
 77
 78    async fn provide_search_paths<'a>(
 79        &'a self,
 80        query: &SearchQuery,
 81        tx: Sender<(&'a Entry, &'a WorktreeSettings)>,
 82    ) {
 83        for (snapshot, worktree_settings) in &self.snapshots {
 84            for entry in snapshot.entries(query.include_ignored(), 0) {
 85                let Ok(_) = tx.send((entry, worktree_settings)).await else {
 86                    return;
 87                };
 88            }
 89        }
 90    }
 91
 92    /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
 93    async fn open_buffers<'a>(
 94        &'a self,
 95        rx: Receiver<ProjectPath>,
 96        find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
 97        cx: &mut AsyncApp,
 98    ) {
 99        _ = maybe!(async move {
100            while let Ok(requested_path) = rx.recv().await {
101                let Some(buffer) = self
102                    .buffer_store
103                    .update(cx, |this, cx| this.open_buffer(requested_path, cx))?
104                    .await
105                    .log_err()
106                else {
107                    continue;
108                };
109                let snapshot = buffer.read_with(cx, |this, _| this.snapshot())?;
110                find_all_matches_tx.send((buffer, snapshot)).await?;
111            }
112            Result::<_, anyhow::Error>::Ok(())
113        })
114        .await;
115    }
116}
117
118struct Worker<'search> {
119    query: &'search SearchQuery,
120    matched_buffer_count: &'search AtomicUsize,
121    matches_count: &'search AtomicUsize,
122    /// Start off with all paths in project and filter them based on:
123    /// - Include filters
124    /// - Exclude filters
125    /// - Only open buffers
126    /// - Scan ignored files
127    /// Put another way: filter out files that can't match (without looking at file contents)
128    input_paths_rx: Receiver<InputPath<'search>>,
129    /// After that, figure out which paths contain at least one match (look at file contents). That's called "partial scan".
130    find_first_match_tx: Sender<MatchingEntry>,
131    find_first_match_rx: Receiver<MatchingEntry>,
132    /// Of those that contain at least one match, look for rest of matches (and figure out their ranges).
133    /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
134    get_buffer_for_full_scan_tx: Sender<ProjectPath>,
135    /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
136    find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
137    /// Cool, we have results; let's share them with the world.
138    publish_matches: Sender<SearchResult>,
139}
140
141impl Worker<'_> {
142    async fn run(self) {
143        let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
144        let mut find_first_match = pin!(self.find_first_match_rx.fuse());
145        let mut scan_path = pin!(self.input_paths_rx.fuse());
146        let handler = RequestHandler {
147            query: self.query,
148            matched_buffer_count: self.matched_buffer_count,
149            matches_count: self.matches_count,
150            find_first_match_tx: &self.find_first_match_tx,
151            get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx,
152            publish_matches: &self.publish_matches,
153        };
154        loop {
155            select_biased! {
156                find_all_matches = find_all_matches.next() => {
157                    let result = handler.handle_find_all_matches(find_all_matches).await;
158                    if let Some(should_bail) = result {
159                        return;
160                    }
161                },
162                find_first_match = find_first_match.next() => {
163
164                },
165                scan_path = scan_path.next() => {
166                    handler.handle_scan_path(scan_path).await;
167                 }
168                 complete => break,
169            }
170        }
171    }
172}
173
174struct RequestHandler<'worker> {
175    query: &'worker SearchQuery,
176    matched_buffer_count: &'worker AtomicUsize,
177    matches_count: &'worker AtomicUsize,
178
179    find_first_match_tx: &'worker Sender<MatchingEntry>,
180    get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
181    publish_matches: &'worker Sender<SearchResult>,
182}
183
184struct LimitReached;
185
186impl RequestHandler<'_> {
187    async fn handle_find_all_matches(
188        &self,
189        req: Option<(Entity<Buffer>, BufferSnapshot)>,
190    ) -> Option<LimitReached> {
191        let Some((buffer, snapshot)) = req else {
192            unreachable!()
193        };
194        let ranges = self
195            .query
196            .search(&snapshot, None)
197            .await
198            .iter()
199            .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
200            .collect::<Vec<_>>();
201
202        let matched_ranges = ranges.len();
203        if self.matched_buffer_count.fetch_add(1, Ordering::Release) > MAX_SEARCH_RESULT_FILES
204            || self
205                .matches_count
206                .fetch_add(matched_ranges, Ordering::Release)
207                > MAX_SEARCH_RESULT_RANGES
208        {
209            Some(LimitReached)
210        } else {
211            self.publish_matches
212                .send(SearchResult::Buffer { buffer, ranges })
213                .await;
214            None
215        }
216    }
217    async fn handle_scan_path(&self, req: InputPath<'_>) {
218        let InputPath {
219            entry,
220            settings,
221            snapshot,
222        } = req;
223        if entry.is_dir() && entry.is_ignored {
224            if !settings.is_path_excluded(&entry.path) {
225                Self::scan_ignored_dir(&fs, &snapshot, &entry.path, &query, &filter_tx, &output_tx)
226                    .await?;
227            }
228            return None;
229            // continue;
230        }
231
232        if entry.is_fifo || !entry.is_file() {
233            return None;
234        }
235
236        if self.query.filters_path() {
237            let matched_path = if self.query.match_full_paths() {
238                let mut full_path = snapshot.root_name().as_std_path().to_owned();
239                full_path.push(entry.path.as_std_path());
240                self.query.match_path(&full_path)
241            } else {
242                self.query.match_path(entry.path.as_std_path())
243            };
244            if !matched_path {
245                return None;
246                // continue;
247            }
248        }
249
250        let (mut tx, rx) = oneshot::channel();
251
252        if open_entries.contains(&entry.id) {
253            tx.send(ProjectPath {
254                worktree_id: snapshot.id(),
255                path: entry.path.clone(),
256            })
257            .await?;
258        } else {
259            filter_tx
260                .send(MatchingEntry {
261                    respond: tx,
262                    worktree_root: snapshot.abs_path().clone(),
263                    path: ProjectPath {
264                        worktree_id: snapshot.id(),
265                        path: entry.path.clone(),
266                    },
267                })
268                .await?;
269        }
270
271        output_tx.send(rx).await?;
272    }
273}
274
275struct InputPath<'worker> {
276    entry: &'worker Entry,
277    settings: &'worker WorktreeSettings,
278    snapshot: &'worker Snapshot,
279}
280
281struct MatchingEntry {
282    worktree_root: Arc<Path>,
283    path: ProjectPath,
284    respond: oneshot::Sender<ProjectPath>,
285}