project_search.rs

  1use std::{
  2    io::{BufRead, BufReader},
  3    path::Path,
  4    pin::pin,
  5    sync::{
  6        Arc,
  7        atomic::{AtomicUsize, Ordering},
  8    },
  9};
 10
 11use collections::HashSet;
 12use fs::Fs;
 13use futures::{SinkExt, StreamExt, select_biased};
 14use gpui::{App, AsyncApp, Entity, WeakEntity};
 15use language::{Buffer, BufferSnapshot};
 16use postage::oneshot;
 17use smol::channel::{Receiver, Sender, bounded, unbounded};
 18
 19use util::{ResultExt, maybe};
 20use worktree::{Entry, ProjectEntryId, Snapshot, WorktreeSettings};
 21
 22use crate::{
 23    ProjectPath,
 24    buffer_store::BufferStore,
 25    search::{SearchQuery, SearchResult},
 26};
 27
 28pub(crate) struct ProjectSearcher {
 29    pub(crate) fs: Arc<dyn Fs>,
 30    pub(crate) buffer_store: WeakEntity<BufferStore>,
 31    pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>,
 32    pub(crate) open_buffers: HashSet<ProjectEntryId>,
 33}
 34
 35const MAX_SEARCH_RESULT_FILES: usize = 5_000;
 36const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
 37
 38impl ProjectSearcher {
 39    pub(crate) fn run(self, query: SearchQuery, cx: &mut App) -> Receiver<SearchResult> {
 40        let executor = cx.background_executor().clone();
 41        let (tx, rx) = unbounded();
 42        cx.spawn(async move |cx| {
 43            const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
 44            let (find_all_matches_tx, find_all_matches_rx) = bounded(MAX_CONCURRENT_BUFFER_OPENS);
 45            let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
 46                bounded(MAX_CONCURRENT_BUFFER_OPENS);
 47            let matches_count = AtomicUsize::new(0);
 48            let matched_buffer_count = AtomicUsize::new(0);
 49            let worker_pool = executor.scoped(|scope| {
 50                let (input_paths_tx, input_paths_rx) = bounded(64);
 51                let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) = bounded(64);
 52                let (sorted_search_results_tx, sorted_search_results_rx) = bounded(64);
 53                for _ in 0..executor.num_cpus() {
 54                    let worker = Worker {
 55                        query: &query,
 56                        open_buffers: &self.open_buffers,
 57                        matched_buffer_count: &matched_buffer_count,
 58                        matches_count: &matches_count,
 59                        fs: &*self.fs,
 60                        input_paths_rx: input_paths_rx.clone(),
 61                        confirm_contents_will_match_rx: confirm_contents_will_match_rx.clone(),
 62                        confirm_contents_will_match_tx: confirm_contents_will_match_tx.clone(),
 63                        get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(),
 64                        find_all_matches_rx: find_all_matches_rx.clone(),
 65                        publish_matches: tx.clone(),
 66                    };
 67                    scope.spawn(worker.run());
 68                }
 69                scope.spawn(self.provide_search_paths(
 70                    &query,
 71                    input_paths_tx,
 72                    sorted_search_results_tx,
 73                ));
 74                scope.spawn(self.maintain_sorted_search_results(
 75                    sorted_search_results_rx,
 76                    get_buffer_for_full_scan_tx,
 77                ))
 78            });
 79            self.open_buffers(get_buffer_for_full_scan_rx, find_all_matches_tx, cx)
 80                .await;
 81            worker_pool.await;
 82            let limit_reached = matches_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_RANGES
 83                || matched_buffer_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_FILES;
 84            if limit_reached {
 85                _ = tx.send(SearchResult::LimitReached).await;
 86            }
 87        })
 88        .detach();
 89        rx
 90    }
 91
 92    async fn provide_search_paths<'this>(
 93        &'this self,
 94        query: &SearchQuery,
 95        tx: Sender<InputPath<'this>>,
 96        results: Sender<oneshot::Receiver<ProjectPath>>,
 97    ) {
 98        for (snapshot, worktree_settings) in &self.snapshots {
 99            for entry in snapshot.entries(query.include_ignored(), 0) {
100                let (should_scan_tx, should_scan_rx) = oneshot::channel();
101                let Ok(_) = tx
102                    .send(InputPath {
103                        entry,
104                        settings: worktree_settings,
105                        snapshot: snapshot,
106                        should_scan_tx,
107                    })
108                    .await
109                else {
110                    return;
111                };
112                results.send(should_scan_rx).await;
113            }
114        }
115    }
116
117    async fn maintain_sorted_search_results(
118        &self,
119        rx: Receiver<oneshot::Receiver<ProjectPath>>,
120        paths_for_full_scan: Sender<ProjectPath>,
121    ) {
122        let mut rx = pin!(rx);
123        while let Some(mut next_path_result) = rx.next().await {
124            let Some(successful_path) = next_path_result.next().await else {
125                // This math did not produce a match, hence skip it.
126                continue;
127            };
128            paths_for_full_scan.send(successful_path).await;
129        }
130    }
131
132    /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
133    async fn open_buffers<'a>(
134        &'a self,
135        rx: Receiver<ProjectPath>,
136        find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
137        cx: &mut AsyncApp,
138    ) {
139        _ = maybe!(async move {
140            while let Ok(requested_path) = rx.recv().await {
141                let Some(buffer) = self
142                    .buffer_store
143                    .update(cx, |this, cx| this.open_buffer(requested_path, cx))?
144                    .await
145                    .log_err()
146                else {
147                    continue;
148                };
149                let snapshot = buffer.read_with(cx, |this, _| this.snapshot())?;
150                find_all_matches_tx.send((buffer, snapshot)).await?;
151            }
152            Result::<_, anyhow::Error>::Ok(())
153        })
154        .await;
155    }
156}
157
158struct Worker<'search> {
159    query: &'search SearchQuery,
160    matched_buffer_count: &'search AtomicUsize,
161    matches_count: &'search AtomicUsize,
162    open_buffers: &'search HashSet<ProjectEntryId>,
163    fs: &'search dyn Fs,
164    /// Start off with all paths in project and filter them based on:
165    /// - Include filters
166    /// - Exclude filters
167    /// - Only open buffers
168    /// - Scan ignored files
169    /// Put another way: filter out files that can't match (without looking at file contents)
170    input_paths_rx: Receiver<InputPath<'search>>,
171
172    /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
173    /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
174    confirm_contents_will_match_tx: Sender<MatchingEntry>,
175    confirm_contents_will_match_rx: Receiver<MatchingEntry>,
176    /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
177    /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
178    get_buffer_for_full_scan_tx: Sender<ProjectPath>,
179    /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
180    find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
181    /// Cool, we have results; let's share them with the world.
182    publish_matches: Sender<SearchResult>,
183}
184
185impl Worker<'_> {
186    async fn run(self) {
187        let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
188        let mut find_first_match = pin!(self.confirm_contents_will_match_rx.fuse());
189        let mut scan_path = pin!(self.input_paths_rx.fuse());
190        let handler = RequestHandler {
191            query: self.query,
192            open_entries: &self.open_buffers,
193            fs: self.fs,
194            matched_buffer_count: self.matched_buffer_count,
195            matches_count: self.matches_count,
196            confirm_contents_will_match_tx: &self.confirm_contents_will_match_tx,
197            get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx,
198            publish_matches: &self.publish_matches,
199        };
200        loop {
201            select_biased! {
202                find_all_matches = find_all_matches.next() => {
203                    let result = handler.handle_find_all_matches(find_all_matches).await;
204                    if let Some(_should_bail) = result {
205                        return;
206                    }
207                },
208                find_first_match = find_first_match.next() => {
209                    if let Some(buffer_with_at_least_one_match) = find_first_match {
210                        handler.handle_find_first_match(buffer_with_at_least_one_match);
211                    }
212
213                },
214                scan_path = scan_path.next() => {
215                    if let Some(path_to_scan) = scan_path {
216                        handler.handle_scan_path(path_to_scan).await;
217                    }
218
219                 }
220                 complete => break,
221            }
222        }
223    }
224}
225
226struct RequestHandler<'worker> {
227    query: &'worker SearchQuery,
228    fs: &'worker dyn Fs,
229    open_entries: &'worker HashSet<ProjectEntryId>,
230    matched_buffer_count: &'worker AtomicUsize,
231    matches_count: &'worker AtomicUsize,
232
233    confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
234    get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
235    publish_matches: &'worker Sender<SearchResult>,
236}
237
238struct LimitReached;
239
240impl RequestHandler<'_> {
241    async fn handle_find_all_matches(
242        &self,
243        req: Option<(Entity<Buffer>, BufferSnapshot)>,
244    ) -> Option<LimitReached> {
245        let Some((buffer, snapshot)) = req else {
246            unreachable!()
247        };
248        let ranges = self
249            .query
250            .search(&snapshot, None)
251            .await
252            .iter()
253            .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
254            .collect::<Vec<_>>();
255
256        let matched_ranges = ranges.len();
257        if self.matched_buffer_count.fetch_add(1, Ordering::Release) > MAX_SEARCH_RESULT_FILES
258            || self
259                .matches_count
260                .fetch_add(matched_ranges, Ordering::Release)
261                > MAX_SEARCH_RESULT_RANGES
262        {
263            Some(LimitReached)
264        } else {
265            self.publish_matches
266                .send(SearchResult::Buffer { buffer, ranges })
267                .await;
268            None
269        }
270    }
271    async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
272        _=maybe!(async move {
273            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
274            let Some(file) = self.fs.open_sync(&abs_path).await.log_err() else {
275                return anyhow::Ok(());
276            };
277
278            let mut file = BufReader::new(file);
279            let file_start = file.fill_buf()?;
280
281            if let Err(Some(starting_position)) =
282            std::str::from_utf8(file_start).map_err(|e| e.error_len())
283            {
284                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
285                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
286                log::debug!(
287                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
288                );
289                return Ok(());
290            }
291
292            if self.query.detect(file).unwrap_or(false) {
293                // Yes, we should scan the whole file.
294                entry.should_scan_tx.send(entry.path).await?;
295            }
296            Ok(())
297        }).await;
298    }
299
300    async fn handle_scan_path(&self, req: InputPath<'_>) {
301        _ = maybe!(async move {
302            let InputPath {
303                entry,
304                settings,
305                snapshot,
306                should_scan_tx,
307            } = req;
308            if entry.is_dir() && entry.is_ignored {
309                if !settings.is_path_excluded(&entry.path) {
310                    // Self::scan_ignored_dir(
311                    //     self.fs,
312                    //     &snapshot,
313                    //     &entry.path,
314                    //     self.query,
315                    //     &filter_tx,
316                    //     &output_tx,
317                    // )
318                    // .await?;
319                }
320                return Ok(());
321            }
322
323            if entry.is_fifo || !entry.is_file() {
324                return Ok(());
325            }
326
327            if self.query.filters_path() {
328                let matched_path = if self.query.match_full_paths() {
329                    let mut full_path = snapshot.root_name().as_std_path().to_owned();
330                    full_path.push(entry.path.as_std_path());
331                    self.query.match_path(&full_path)
332                } else {
333                    self.query.match_path(entry.path.as_std_path())
334                };
335                if !matched_path {
336                    return Ok(());
337                }
338            }
339
340            if self.open_entries.contains(&entry.id) {
341                // The buffer is already in memory and that's the version we want to scan;
342                // hence skip the dilly-dally and look for all matches straight away.
343                self.get_buffer_for_full_scan_tx
344                    .send(ProjectPath {
345                        worktree_id: snapshot.id(),
346                        path: entry.path.clone(),
347                    })
348                    .await?;
349            } else {
350                self.confirm_contents_will_match_tx
351                    .send(MatchingEntry {
352                        should_scan_tx: should_scan_tx,
353                        worktree_root: snapshot.abs_path().clone(),
354                        path: ProjectPath {
355                            worktree_id: snapshot.id(),
356                            path: entry.path.clone(),
357                        },
358                    })
359                    .await?;
360            }
361
362            anyhow::Ok(())
363        })
364        .await;
365    }
366}
367
368struct InputPath<'worker> {
369    entry: &'worker Entry,
370    settings: &'worker WorktreeSettings,
371    snapshot: &'worker Snapshot,
372    should_scan_tx: oneshot::Sender<ProjectPath>,
373}
374
375struct MatchingEntry {
376    worktree_root: Arc<Path>,
377    path: ProjectPath,
378    should_scan_tx: oneshot::Sender<ProjectPath>,
379}