project_search.rs

  1use std::{
  2    io::{BufRead, BufReader},
  3    path::Path,
  4    pin::pin,
  5    sync::{
  6        Arc,
  7        atomic::{AtomicUsize, Ordering},
  8    },
  9};
 10
 11use collections::HashSet;
 12use fs::Fs;
 13use futures::{SinkExt, StreamExt, select_biased};
 14use gpui::{App, AsyncApp, Entity, WeakEntity};
 15use language::{Buffer, BufferSnapshot};
 16use postage::oneshot;
 17use smol::channel::{Receiver, Sender, bounded, unbounded};
 18
 19use util::{ResultExt, maybe};
 20use worktree::{Entry, ProjectEntryId, Snapshot, WorktreeSettings};
 21
 22use crate::{
 23    ProjectPath,
 24    buffer_store::BufferStore,
 25    search::{SearchQuery, SearchResult},
 26};
 27
 28pub(crate) struct ProjectSearcher {
 29    pub(crate) fs: Arc<dyn Fs>,
 30    pub(crate) buffer_store: WeakEntity<BufferStore>,
 31    pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>,
 32    pub(crate) open_buffers: HashSet<ProjectEntryId>,
 33}
 34
 35const MAX_SEARCH_RESULT_FILES: usize = 5_000;
 36const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
 37
 38impl ProjectSearcher {
 39    pub(crate) fn run(self, query: SearchQuery, cx: &mut App) -> Receiver<SearchResult> {
 40        let executor = cx.background_executor().clone();
 41        let (tx, rx) = unbounded();
 42        cx.spawn(async move |cx| {
 43            const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
 44            let (find_all_matches_tx, find_all_matches_rx) = bounded(MAX_CONCURRENT_BUFFER_OPENS);
 45            let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
 46                bounded(MAX_CONCURRENT_BUFFER_OPENS);
 47            let matches_count = AtomicUsize::new(0);
 48            let matched_buffer_count = AtomicUsize::new(0);
 49            let worker_pool = executor.scoped(|scope| {
 50                let (input_paths_tx, input_paths_rx) = bounded(64);
 51                let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) = bounded(64);
 52                let (sorted_search_results_tx, sorted_search_results_rx) = bounded(64);
 53                for _ in 0..executor.num_cpus() {
 54                    let worker = Worker {
 55                        query: &query,
 56                        open_buffers: &self.open_buffers,
 57                        matched_buffer_count: &matched_buffer_count,
 58                        matches_count: &matches_count,
 59                        fs: &*self.fs,
 60                        input_paths_rx: input_paths_rx.clone(),
 61                        confirm_contents_will_match_rx: confirm_contents_will_match_rx.clone(),
 62                        confirm_contents_will_match_tx: confirm_contents_will_match_tx.clone(),
 63                        get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(),
 64                        find_all_matches_rx: find_all_matches_rx.clone(),
 65                        publish_matches: tx.clone(),
 66                    };
 67                    scope.spawn(worker.run());
 68                }
 69                scope.spawn(self.provide_search_paths(
 70                    &query,
 71                    input_paths_tx,
 72                    sorted_search_results_tx,
 73                ));
 74                scope.spawn(self.maintain_sorted_search_results(
 75                    sorted_search_results_rx,
 76                    get_buffer_for_full_scan_tx,
 77                ))
 78            });
 79            let open_buffers =
 80                self.open_buffers(get_buffer_for_full_scan_rx, find_all_matches_tx, cx);
 81            futures::future::join(worker_pool, open_buffers).await;
 82
 83            let limit_reached = matches_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_RANGES
 84                || matched_buffer_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_FILES;
 85            if limit_reached {
 86                _ = tx.send(SearchResult::LimitReached).await;
 87            }
 88        })
 89        .detach();
 90        rx
 91    }
 92
 93    async fn provide_search_paths<'this>(
 94        &'this self,
 95        query: &SearchQuery,
 96        tx: Sender<InputPath<'this>>,
 97        results: Sender<oneshot::Receiver<ProjectPath>>,
 98    ) {
 99        for (snapshot, worktree_settings) in &self.snapshots {
100            for entry in snapshot.entries(query.include_ignored(), 0) {
101                let (should_scan_tx, should_scan_rx) = oneshot::channel();
102                let Ok(_) = tx
103                    .send(InputPath {
104                        entry,
105                        settings: worktree_settings,
106                        snapshot: snapshot,
107                        should_scan_tx,
108                    })
109                    .await
110                else {
111                    return;
112                };
113                if results.send(should_scan_rx).await.is_err() {
114                    return;
115                };
116            }
117        }
118    }
119
120    async fn maintain_sorted_search_results(
121        &self,
122        rx: Receiver<oneshot::Receiver<ProjectPath>>,
123        paths_for_full_scan: Sender<ProjectPath>,
124    ) {
125        let mut rx = pin!(rx);
126        while let Some(mut next_path_result) = rx.next().await {
127            let Some(successful_path) = next_path_result.next().await else {
128                // This math did not produce a match, hence skip it.
129                continue;
130            };
131            if paths_for_full_scan.send(successful_path).await.is_err() {
132                return;
133            };
134        }
135    }
136
137    /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
138    async fn open_buffers<'a>(
139        &'a self,
140        rx: Receiver<ProjectPath>,
141        find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
142        cx: &mut AsyncApp,
143    ) {
144        _ = maybe!(async move {
145            while let Ok(requested_path) = rx.recv().await {
146                let Some(buffer) = self
147                    .buffer_store
148                    .update(cx, |this, cx| this.open_buffer(requested_path, cx))?
149                    .await
150                    .log_err()
151                else {
152                    continue;
153                };
154                let snapshot = buffer.read_with(cx, |this, _| this.snapshot())?;
155                find_all_matches_tx.send((buffer, snapshot)).await?;
156            }
157            Result::<_, anyhow::Error>::Ok(())
158        })
159        .await;
160    }
161}
162
163struct Worker<'search> {
164    query: &'search SearchQuery,
165    matched_buffer_count: &'search AtomicUsize,
166    matches_count: &'search AtomicUsize,
167    open_buffers: &'search HashSet<ProjectEntryId>,
168    fs: &'search dyn Fs,
169    /// Start off with all paths in project and filter them based on:
170    /// - Include filters
171    /// - Exclude filters
172    /// - Only open buffers
173    /// - Scan ignored files
174    /// Put another way: filter out files that can't match (without looking at file contents)
175    input_paths_rx: Receiver<InputPath<'search>>,
176
177    /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
178    /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
179    confirm_contents_will_match_tx: Sender<MatchingEntry>,
180    confirm_contents_will_match_rx: Receiver<MatchingEntry>,
181    /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
182    /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
183    get_buffer_for_full_scan_tx: Sender<ProjectPath>,
184    /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
185    find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
186    /// Cool, we have results; let's share them with the world.
187    publish_matches: Sender<SearchResult>,
188}
189
190impl Worker<'_> {
191    async fn run(self) {
192        let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
193        let mut find_first_match = pin!(self.confirm_contents_will_match_rx.fuse());
194        let mut scan_path = pin!(self.input_paths_rx.fuse());
195        let handler = RequestHandler {
196            query: self.query,
197            open_entries: &self.open_buffers,
198            fs: self.fs,
199            matched_buffer_count: self.matched_buffer_count,
200            matches_count: self.matches_count,
201            confirm_contents_will_match_tx: &self.confirm_contents_will_match_tx,
202            get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx,
203            publish_matches: &self.publish_matches,
204        };
205        loop {
206            select_biased! {
207                find_all_matches = find_all_matches.next() => {
208                    let result = handler.handle_find_all_matches(find_all_matches).await;
209                    if let Some(_should_bail) = result {
210                        return;
211                    }
212                },
213                find_first_match = find_first_match.next() => {
214                    if let Some(buffer_with_at_least_one_match) = find_first_match {
215                        handler.handle_find_first_match(buffer_with_at_least_one_match).await;
216                    }
217
218                },
219                scan_path = scan_path.next() => {
220                    if let Some(path_to_scan) = scan_path {
221                        handler.handle_scan_path(path_to_scan).await;
222                    }
223
224                 }
225                 complete => break,
226            }
227        }
228    }
229}
230
231struct RequestHandler<'worker> {
232    query: &'worker SearchQuery,
233    fs: &'worker dyn Fs,
234    open_entries: &'worker HashSet<ProjectEntryId>,
235    matched_buffer_count: &'worker AtomicUsize,
236    matches_count: &'worker AtomicUsize,
237
238    confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
239    get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
240    publish_matches: &'worker Sender<SearchResult>,
241}
242
243struct LimitReached;
244
245impl RequestHandler<'_> {
246    async fn handle_find_all_matches(
247        &self,
248        req: Option<(Entity<Buffer>, BufferSnapshot)>,
249    ) -> Option<LimitReached> {
250        let Some((buffer, snapshot)) = req else {
251            unreachable!()
252        };
253        let ranges = self
254            .query
255            .search(&snapshot, None)
256            .await
257            .iter()
258            .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
259            .collect::<Vec<_>>();
260
261        let matched_ranges = ranges.len();
262        if self.matched_buffer_count.fetch_add(1, Ordering::Release) > MAX_SEARCH_RESULT_FILES
263            || self
264                .matches_count
265                .fetch_add(matched_ranges, Ordering::Release)
266                > MAX_SEARCH_RESULT_RANGES
267        {
268            Some(LimitReached)
269        } else {
270            _ = self
271                .publish_matches
272                .send(SearchResult::Buffer { buffer, ranges })
273                .await;
274            None
275        }
276    }
277    async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
278        _=maybe!(async move {
279            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
280            let Some(file) = self.fs.open_sync(&abs_path).await.log_err() else {
281                return anyhow::Ok(());
282            };
283
284            let mut file = BufReader::new(file);
285            let file_start = file.fill_buf()?;
286
287            if let Err(Some(starting_position)) =
288            std::str::from_utf8(file_start).map_err(|e| e.error_len())
289            {
290                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
291                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
292                log::debug!(
293                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
294                );
295                return Ok(());
296            }
297
298            if self.query.detect(file).unwrap_or(false) {
299                // Yes, we should scan the whole file.
300                entry.should_scan_tx.send(entry.path).await?;
301            }
302            Ok(())
303        }).await;
304    }
305
306    async fn handle_scan_path(&self, req: InputPath<'_>) {
307        _ = maybe!(async move {
308            let InputPath {
309                entry,
310                settings,
311                snapshot,
312                should_scan_tx,
313            } = req;
314            if entry.is_dir() && entry.is_ignored {
315                if !settings.is_path_excluded(&entry.path) {
316                    // Self::scan_ignored_dir(
317                    //     self.fs,
318                    //     &snapshot,
319                    //     &entry.path,
320                    //     self.query,
321                    //     &filter_tx,
322                    //     &output_tx,
323                    // )
324                    // .await?;
325                }
326                return Ok(());
327            }
328
329            if entry.is_fifo || !entry.is_file() {
330                return Ok(());
331            }
332
333            if self.query.filters_path() {
334                let matched_path = if self.query.match_full_paths() {
335                    let mut full_path = snapshot.root_name().as_std_path().to_owned();
336                    full_path.push(entry.path.as_std_path());
337                    self.query.match_path(&full_path)
338                } else {
339                    self.query.match_path(entry.path.as_std_path())
340                };
341                if !matched_path {
342                    return Ok(());
343                }
344            }
345
346            if self.open_entries.contains(&entry.id) {
347                // The buffer is already in memory and that's the version we want to scan;
348                // hence skip the dilly-dally and look for all matches straight away.
349                self.get_buffer_for_full_scan_tx
350                    .send(ProjectPath {
351                        worktree_id: snapshot.id(),
352                        path: entry.path.clone(),
353                    })
354                    .await?;
355            } else {
356                self.confirm_contents_will_match_tx
357                    .send(MatchingEntry {
358                        should_scan_tx: should_scan_tx,
359                        worktree_root: snapshot.abs_path().clone(),
360                        path: ProjectPath {
361                            worktree_id: snapshot.id(),
362                            path: entry.path.clone(),
363                        },
364                    })
365                    .await?;
366            }
367
368            anyhow::Ok(())
369        })
370        .await;
371    }
372}
373
374struct InputPath<'worker> {
375    entry: &'worker Entry,
376    settings: &'worker WorktreeSettings,
377    snapshot: &'worker Snapshot,
378    should_scan_tx: oneshot::Sender<ProjectPath>,
379}
380
381struct MatchingEntry {
382    worktree_root: Arc<Path>,
383    path: ProjectPath,
384    should_scan_tx: oneshot::Sender<ProjectPath>,
385}