project_search.rs

  1use std::{
  2    io::{BufRead, BufReader},
  3    path::Path,
  4    pin::pin,
  5    sync::{
  6        Arc,
  7        atomic::{AtomicUsize, Ordering},
  8    },
  9};
 10
 11use collections::HashSet;
 12use fs::Fs;
 13use futures::{SinkExt, StreamExt, select_biased};
 14use gpui::{App, AsyncApp, Entity, WeakEntity};
 15use language::{Buffer, BufferSnapshot};
 16use postage::oneshot;
 17use smol::channel::{Receiver, Sender, bounded, unbounded};
 18
 19use util::{ResultExt, maybe};
 20use worktree::{Entry, ProjectEntryId, Snapshot, WorktreeSettings};
 21
 22use crate::{
 23    ProjectPath,
 24    buffer_store::BufferStore,
 25    search::{SearchQuery, SearchResult},
 26};
 27
 28pub(crate) struct ProjectSearcher {
 29    fs: Arc<dyn Fs>,
 30    buffer_store: WeakEntity<BufferStore>,
 31    pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>,
 32    open_buffers: HashSet<ProjectEntryId>,
 33}
 34
 35const MAX_SEARCH_RESULT_FILES: usize = 5_000;
 36const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
 37
 38impl ProjectSearcher {
 39    pub(crate) fn search(self, query: SearchQuery, cx: &mut App) -> Receiver<SearchResult> {
 40        let executor = cx.background_executor().clone();
 41        let (tx, rx) = unbounded();
 42        cx.spawn(async move |cx| {
 43            const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
 44            let (find_all_matches_tx, find_all_matches_rx) = bounded(MAX_CONCURRENT_BUFFER_OPENS);
 45            let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
 46                bounded(MAX_CONCURRENT_BUFFER_OPENS);
 47            let matches_count = AtomicUsize::new(0);
 48            let matched_buffer_count = AtomicUsize::new(0);
 49            let worker_pool = executor.scoped(|scope| {
 50                let (input_paths_tx, input_paths_rx) = bounded(64);
 51                let (find_first_match_tx, find_first_match_rx) = bounded(64);
 52                let (sorted_search_results_tx, sorted_search_results_rx) = bounded(64);
 53                for _ in 0..executor.num_cpus() {
 54                    let worker = Worker {
 55                        query: &query,
 56                        open_buffers: &self.open_buffers,
 57                        matched_buffer_count: &matched_buffer_count,
 58                        matches_count: &matches_count,
 59                        fs: &*self.fs,
 60                        input_paths_rx: input_paths_rx.clone(),
 61                        find_first_match_rx: find_first_match_rx.clone(),
 62                        find_first_match_tx: find_first_match_tx.clone(),
 63                        get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(),
 64                        find_all_matches_rx: find_all_matches_rx.clone(),
 65                        publish_matches: tx.clone(),
 66                    };
 67                    scope.spawn(worker.run());
 68                }
 69                scope.spawn(self.provide_search_paths(
 70                    &query,
 71                    input_paths_tx,
 72                    sorted_search_results_tx,
 73                ));
 74                scope.spawn(self.maintain_sorted_search_results())
 75            });
 76            self.open_buffers(get_buffer_for_full_scan_rx, find_all_matches_tx, cx)
 77                .await;
 78            worker_pool.await;
 79            let limit_reached = matches_count.load(Ordering::Release) > MAX_SEARCH_RESULT_RANGES
 80                || matched_buffer_count.load(Ordering::Release) > MAX_SEARCH_RESULT_FILES;
 81            if limit_reached {
 82                _ = tx.send(SearchResult::LimitReached).await;
 83            }
 84        })
 85        .detach();
 86        rx
 87    }
 88
 89    async fn provide_search_paths<'this>(
 90        &'this self,
 91        query: &SearchQuery,
 92        tx: Sender<InputPath<'this>>,
 93        results: Sender<oneshot::Receiver<ProjectPath>>,
 94    ) {
 95        for (snapshot, worktree_settings) in &self.snapshots {
 96            for entry in snapshot.entries(query.include_ignored(), 0) {
 97                let (should_scan_tx, should_scan_rx) = oneshot::channel();
 98                let Ok(_) = tx
 99                    .send(InputPath {
100                        entry,
101                        settings: worktree_settings,
102                        snapshot: snapshot,
103                        should_scan_tx,
104                    })
105                    .await
106                else {
107                    return;
108                };
109                results.send(should_scan_rx).await;
110            }
111        }
112    }
113
114    async fn maintain_sorted_search_results(
115        &self,
116        rx: Receiver<oneshot::Receiver<ProjectPath>>,
117        paths_for_full_scan: Sender<ProjectPath>,
118    ) {
119        let mut rx = pin!(rx);
120        while let Some(mut next_path_result) = rx.next().await {
121            let Some(successful_path) = next_path_result.next().await else {
122                // This math did not produce a match, hence skip it.
123                continue;
124            };
125            paths_for_full_scan.send(successful_path).await;
126        }
127    }
128
129    /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
130    async fn open_buffers<'a>(
131        &'a self,
132        rx: Receiver<ProjectPath>,
133        find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
134        cx: &mut AsyncApp,
135    ) {
136        _ = maybe!(async move {
137            while let Ok(requested_path) = rx.recv().await {
138                let Some(buffer) = self
139                    .buffer_store
140                    .update(cx, |this, cx| this.open_buffer(requested_path, cx))?
141                    .await
142                    .log_err()
143                else {
144                    continue;
145                };
146                let snapshot = buffer.read_with(cx, |this, _| this.snapshot())?;
147                find_all_matches_tx.send((buffer, snapshot)).await?;
148            }
149            Result::<_, anyhow::Error>::Ok(())
150        })
151        .await;
152    }
153}
154
155struct Worker<'search> {
156    query: &'search SearchQuery,
157    matched_buffer_count: &'search AtomicUsize,
158    matches_count: &'search AtomicUsize,
159    open_buffers: &'search HashSet<ProjectEntryId>,
160    fs: &'search dyn Fs,
161    /// Start off with all paths in project and filter them based on:
162    /// - Include filters
163    /// - Exclude filters
164    /// - Only open buffers
165    /// - Scan ignored files
166    /// Put another way: filter out files that can't match (without looking at file contents)
167    input_paths_rx: Receiver<InputPath<'search>>,
168    /// After that, figure out which paths contain at least one match (look at file contents). That's called "partial scan".
169    find_first_match_tx: Sender<MatchingEntry>,
170    find_first_match_rx: Receiver<MatchingEntry>,
171    /// Of those that contain at least one match, look for rest of matches (and figure out their ranges).
172    /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
173    get_buffer_for_full_scan_tx: Sender<ProjectPath>,
174    /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
175    find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
176    /// Cool, we have results; let's share them with the world.
177    publish_matches: Sender<SearchResult>,
178}
179
180impl Worker<'_> {
181    async fn run(self) {
182        let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
183        let mut find_first_match = pin!(self.find_first_match_rx.fuse());
184        let mut scan_path = pin!(self.input_paths_rx.fuse());
185        let handler = RequestHandler {
186            query: self.query,
187            open_entries: &self.open_buffers,
188            fs: self.fs,
189            matched_buffer_count: self.matched_buffer_count,
190            matches_count: self.matches_count,
191            find_first_match_tx: &self.find_first_match_tx,
192            get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx,
193            publish_matches: &self.publish_matches,
194        };
195        loop {
196            select_biased! {
197                find_all_matches = find_all_matches.next() => {
198                    let result = handler.handle_find_all_matches(find_all_matches).await;
199                    if let Some(_should_bail) = result {
200                        return;
201                    }
202                },
203                find_first_match = find_first_match.next() => {
204                    if let Some(buffer_with_at_least_one_match) = find_first_match {
205                        handler.handle_find_first_match(buffer_with_at_least_one_match);
206                    }
207
208                },
209                scan_path = scan_path.next() => {
210                    if let Some(path_to_scan) = scan_path {
211                        handler.handle_scan_path(path_to_scan).await;
212                    }
213
214                 }
215                 complete => break,
216            }
217        }
218    }
219}
220
221struct RequestHandler<'worker> {
222    query: &'worker SearchQuery,
223    fs: &'worker dyn Fs,
224    open_entries: &'worker HashSet<ProjectEntryId>,
225    matched_buffer_count: &'worker AtomicUsize,
226    matches_count: &'worker AtomicUsize,
227
228    find_first_match_tx: &'worker Sender<MatchingEntry>,
229    get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
230    publish_matches: &'worker Sender<SearchResult>,
231}
232
233struct LimitReached;
234
235impl RequestHandler<'_> {
236    async fn handle_find_all_matches(
237        &self,
238        req: Option<(Entity<Buffer>, BufferSnapshot)>,
239    ) -> Option<LimitReached> {
240        let Some((buffer, snapshot)) = req else {
241            unreachable!()
242        };
243        let ranges = self
244            .query
245            .search(&snapshot, None)
246            .await
247            .iter()
248            .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
249            .collect::<Vec<_>>();
250
251        let matched_ranges = ranges.len();
252        if self.matched_buffer_count.fetch_add(1, Ordering::Release) > MAX_SEARCH_RESULT_FILES
253            || self
254                .matches_count
255                .fetch_add(matched_ranges, Ordering::Release)
256                > MAX_SEARCH_RESULT_RANGES
257        {
258            Some(LimitReached)
259        } else {
260            self.publish_matches
261                .send(SearchResult::Buffer { buffer, ranges })
262                .await;
263            None
264        }
265    }
266    async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
267        _=maybe!(async move {
268            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
269            let Some(file) = self.fs.open_sync(&abs_path).await.log_err() else {
270                return anyhow::Ok(());
271            };
272
273            let mut file = BufReader::new(file);
274            let file_start = file.fill_buf()?;
275
276            if let Err(Some(starting_position)) =
277            std::str::from_utf8(file_start).map_err(|e| e.error_len())
278            {
279                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
280                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
281                log::debug!(
282                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
283                );
284                return Ok(());
285            }
286
287            if self.query.detect(file).unwrap_or(false) {
288                // Yes, we should scan the whole file.
289                entry.should_scan_tx.send(entry.path).await?;
290            }
291            Ok(())
292        }).await;
293    }
294
295    async fn handle_scan_path(&self, req: InputPath<'_>) {
296        _ = maybe!(async move {
297            let InputPath {
298                entry,
299                settings,
300                snapshot,
301                should_scan_tx,
302            } = req;
303            if entry.is_dir() && entry.is_ignored {
304                if !settings.is_path_excluded(&entry.path) {
305                    Self::scan_ignored_dir(
306                        self.fs,
307                        &snapshot,
308                        &entry.path,
309                        self.query,
310                        &filter_tx,
311                        &output_tx,
312                    )
313                    .await?;
314                }
315                return Ok(());
316            }
317
318            if entry.is_fifo || !entry.is_file() {
319                return Ok(());
320            }
321
322            if self.query.filters_path() {
323                let matched_path = if self.query.match_full_paths() {
324                    let mut full_path = snapshot.root_name().as_std_path().to_owned();
325                    full_path.push(entry.path.as_std_path());
326                    self.query.match_path(&full_path)
327                } else {
328                    self.query.match_path(entry.path.as_std_path())
329                };
330                if !matched_path {
331                    return Ok(());
332                }
333            }
334
335            let (mut tx, rx) = oneshot::channel();
336
337            if self.open_entries.contains(&entry.id) {
338                // The buffer is already in memory and that's the version we want to scan;
339                // hence skip the dilly-dally and look for all matches straight away.
340                self.get_buffer_for_full_scan_tx
341                    .send(ProjectPath {
342                        worktree_id: snapshot.id(),
343                        path: entry.path.clone(),
344                    })
345                    .await?;
346            } else {
347                self.find_first_match_tx
348                    .send(MatchingEntry {
349                        should_scan_tx: should_scan_tx,
350                        worktree_root: snapshot.abs_path().clone(),
351                        path: ProjectPath {
352                            worktree_id: snapshot.id(),
353                            path: entry.path.clone(),
354                        },
355                    })
356                    .await?;
357            }
358
359            output_tx.send(rx).await?;
360            anyhow::Ok(())
361        })
362        .await;
363    }
364}
365
366struct InputPath<'worker> {
367    entry: &'worker Entry,
368    settings: &'worker WorktreeSettings,
369    snapshot: &'worker Snapshot,
370    should_scan_tx: oneshot::Sender<ProjectPath>,
371}
372
373struct MatchingEntry {
374    worktree_root: Arc<Path>,
375    path: ProjectPath,
376    should_scan_tx: oneshot::Sender<ProjectPath>,
377}