project_search.rs

  1use std::{
  2    io::{BufRead, BufReader},
  3    path::Path,
  4    pin::pin,
  5    sync::{
  6        Arc,
  7        atomic::{AtomicUsize, Ordering},
  8    },
  9};
 10
 11use collections::HashSet;
 12use fs::Fs;
 13use futures::{SinkExt, StreamExt, select_biased};
 14use gpui::{App, AppContext, AsyncApp, Entity, Task};
 15use language::{Buffer, BufferSnapshot};
 16use postage::oneshot;
 17use smol::channel::{Receiver, Sender, bounded, unbounded};
 18
 19use util::{ResultExt, maybe};
 20use worktree::{Entry, ProjectEntryId, Snapshot, WorktreeSettings};
 21
 22use crate::{
 23    ProjectItem, ProjectPath,
 24    buffer_store::BufferStore,
 25    search::{SearchQuery, SearchResult},
 26};
 27
 28pub(crate) struct Search {
 29    pub(crate) fs: Arc<dyn Fs>,
 30    pub(crate) buffer_store: Entity<BufferStore>,
 31    pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>,
 32    pub(crate) open_buffers: HashSet<ProjectEntryId>,
 33}
 34
 35const MAX_SEARCH_RESULT_FILES: usize = 5_000;
 36const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
 37
 38/// Represents results of project search and allows one to either obtain match positions OR
 39/// just the handles to buffers that may match the search.
 40#[must_use]
 41pub(crate) struct SearchResultsHandle {
 42    results: Receiver<SearchResult>,
 43    matching_buffers: Receiver<Entity<Buffer>>,
 44    trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
 45}
 46
 47impl SearchResultsHandle {
 48    pub(crate) fn results(self, cx: &mut App) -> Receiver<SearchResult> {
 49        (self.trigger_search)(cx).detach();
 50        self.results
 51    }
 52    pub(crate) fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
 53        (self.trigger_search)(cx).detach();
 54        self.matching_buffers
 55    }
 56}
 57
 58impl Search {
 59    /// Prepares a project search run. The result has to be used to specify whether you're interested in matching buffers
 60    /// or full search results.
 61    pub(crate) fn into_results(self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
 62        let mut open_buffers = HashSet::default();
 63        let mut unnamed_buffers = Vec::new();
 64        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
 65        let buffers = self.buffer_store.read(cx);
 66        for handle in buffers.buffers() {
 67            let buffer = handle.read(cx);
 68            if !buffers.is_searchable(&buffer.remote_id()) {
 69                continue;
 70            } else if let Some(entry_id) = buffer.entry_id(cx) {
 71                open_buffers.insert(entry_id);
 72            } else {
 73                // limit = limit.saturating_sub(1); todo!()
 74                unnamed_buffers.push(handle)
 75            };
 76        }
 77        let executor = cx.background_executor().clone();
 78        let (tx, rx) = unbounded();
 79        let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
 80        let matching_buffers = grab_buffer_snapshot_rx.clone();
 81        let trigger_search = Box::new(|cx: &mut App| {
 82            cx.spawn(async move |cx| {
 83                for buffer in unnamed_buffers {
 84                    _ = grab_buffer_snapshot_tx.send(buffer).await;
 85                }
 86
 87                let (find_all_matches_tx, find_all_matches_rx) =
 88                    bounded(MAX_CONCURRENT_BUFFER_OPENS);
 89
 90                let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) = unbounded();
 91                let matches_count = AtomicUsize::new(0);
 92                let matched_buffer_count = AtomicUsize::new(0);
 93                let worker_pool = executor.scoped(|scope| {
 94                    let (input_paths_tx, input_paths_rx) = bounded(64);
 95                    let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
 96                        bounded(64);
 97                    let (sorted_search_results_tx, sorted_search_results_rx) = bounded(64);
 98                    for _ in 0..executor.num_cpus() {
 99                        let worker = Worker {
100                            query: &query,
101                            open_buffers: &self.open_buffers,
102                            matched_buffer_count: &matched_buffer_count,
103                            matches_count: &matches_count,
104                            fs: &*self.fs,
105                            input_paths_rx: input_paths_rx.clone(),
106                            confirm_contents_will_match_rx: confirm_contents_will_match_rx.clone(),
107                            confirm_contents_will_match_tx: confirm_contents_will_match_tx.clone(),
108                            get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(),
109                            find_all_matches_rx: find_all_matches_rx.clone(),
110                            publish_matches: tx.clone(),
111                        };
112                        scope.spawn(worker.run());
113                    }
114                    scope.spawn(self.provide_search_paths(
115                        &query,
116                        input_paths_tx,
117                        sorted_search_results_tx,
118                    ));
119                    scope.spawn(self.maintain_sorted_search_results(
120                        sorted_search_results_rx,
121                        get_buffer_for_full_scan_tx,
122                    ))
123                });
124                let open_buffers = self.open_buffers(
125                    get_buffer_for_full_scan_rx,
126                    grab_buffer_snapshot_tx,
127                    cx.clone(),
128                );
129                let buffer_snapshots = self.grab_buffer_snapshots(
130                    grab_buffer_snapshot_rx,
131                    find_all_matches_tx,
132                    cx.clone(),
133                );
134                futures::future::join3(worker_pool, buffer_snapshots, open_buffers).await;
135
136                let limit_reached = matches_count.load(Ordering::Acquire)
137                    > MAX_SEARCH_RESULT_RANGES
138                    || matched_buffer_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_FILES;
139                if limit_reached {
140                    _ = tx.send(SearchResult::LimitReached).await;
141                }
142            })
143        });
144        SearchResultsHandle {
145            results: rx,
146            matching_buffers,
147            trigger_search,
148        }
149    }
150
151    async fn provide_search_paths<'this>(
152        &'this self,
153        query: &SearchQuery,
154        tx: Sender<InputPath<'this>>,
155        results: Sender<oneshot::Receiver<ProjectPath>>,
156    ) {
157        for (snapshot, worktree_settings) in &self.snapshots {
158            for entry in snapshot.entries(query.include_ignored(), 0) {
159                let (should_scan_tx, should_scan_rx) = oneshot::channel();
160                let Ok(_) = tx
161                    .send(InputPath {
162                        entry,
163                        settings: worktree_settings,
164                        snapshot: snapshot,
165                        should_scan_tx,
166                    })
167                    .await
168                else {
169                    return;
170                };
171                if results.send(should_scan_rx).await.is_err() {
172                    return;
173                };
174            }
175        }
176    }
177
178    async fn maintain_sorted_search_results(
179        &self,
180        rx: Receiver<oneshot::Receiver<ProjectPath>>,
181        paths_for_full_scan: Sender<ProjectPath>,
182    ) {
183        let mut rx = pin!(rx);
184        while let Some(mut next_path_result) = rx.next().await {
185            let Some(successful_path) = next_path_result.next().await else {
186                // This math did not produce a match, hence skip it.
187                continue;
188            };
189            if paths_for_full_scan.send(successful_path).await.is_err() {
190                return;
191            };
192        }
193    }
194
195    /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
196    async fn open_buffers<'a>(
197        &'a self,
198        rx: Receiver<ProjectPath>,
199        find_all_matches_tx: Sender<Entity<Buffer>>,
200        mut cx: AsyncApp,
201    ) {
202        _ = maybe!(async move {
203            while let Ok(requested_path) = rx.recv().await {
204                let Some(buffer) = self
205                    .buffer_store
206                    .update(&mut cx, |this, cx| this.open_buffer(requested_path, cx))?
207                    .await
208                    .log_err()
209                else {
210                    continue;
211                };
212                find_all_matches_tx.send(buffer).await?;
213            }
214            Result::<_, anyhow::Error>::Ok(())
215        })
216        .await;
217    }
218
219    async fn grab_buffer_snapshots<'a>(
220        &'a self,
221        rx: Receiver<Entity<Buffer>>,
222        find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
223        mut cx: AsyncApp,
224    ) {
225        _ = maybe!(async move {
226            while let Ok(buffer) = rx.recv().await {
227                let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
228                find_all_matches_tx.send((buffer, snapshot)).await?;
229            }
230            Result::<_, anyhow::Error>::Ok(())
231        })
232        .await;
233    }
234}
235
236struct Worker<'search> {
237    query: &'search SearchQuery,
238    matched_buffer_count: &'search AtomicUsize,
239    matches_count: &'search AtomicUsize,
240    open_buffers: &'search HashSet<ProjectEntryId>,
241    fs: &'search dyn Fs,
242    /// Start off with all paths in project and filter them based on:
243    /// - Include filters
244    /// - Exclude filters
245    /// - Only open buffers
246    /// - Scan ignored files
247    /// Put another way: filter out files that can't match (without looking at file contents)
248    input_paths_rx: Receiver<InputPath<'search>>,
249
250    /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
251    /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
252    confirm_contents_will_match_tx: Sender<MatchingEntry>,
253    confirm_contents_will_match_rx: Receiver<MatchingEntry>,
254    /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
255    /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
256    get_buffer_for_full_scan_tx: Sender<ProjectPath>,
257    /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
258    find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
259    /// Cool, we have results; let's share them with the world.
260    publish_matches: Sender<SearchResult>,
261}
262
263impl Worker<'_> {
264    async fn run(self) {
265        let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
266        let mut find_first_match = pin!(self.confirm_contents_will_match_rx.fuse());
267        let mut scan_path = pin!(self.input_paths_rx.fuse());
268        let handler = RequestHandler {
269            query: self.query,
270            open_entries: &self.open_buffers,
271            fs: self.fs,
272            matched_buffer_count: self.matched_buffer_count,
273            matches_count: self.matches_count,
274            confirm_contents_will_match_tx: &self.confirm_contents_will_match_tx,
275            get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx,
276            publish_matches: &self.publish_matches,
277        };
278
279        loop {
280            select_biased! {
281                find_all_matches = find_all_matches.next() => {
282                    let Some(matches) = find_all_matches else {
283                        self.publish_matches.close();
284                        continue;
285                        };
286                    let result = handler.handle_find_all_matches(matches).await;
287                    if let Some(_should_bail) = result {
288                        self.publish_matches.close();
289                        break;
290                    }
291                },
292                find_first_match = find_first_match.next() => {
293                    if let Some(buffer_with_at_least_one_match) = find_first_match {
294                        handler.handle_find_first_match(buffer_with_at_least_one_match).await;
295                    } else {
296                        self.get_buffer_for_full_scan_tx.close();
297                    }
298
299                },
300                scan_path = scan_path.next() => {
301                    if let Some(path_to_scan) = scan_path {
302                        handler.handle_scan_path(path_to_scan).await;
303                    } else {
304                        self.confirm_contents_will_match_tx.close();
305                    }
306
307                 }
308                 complete => break,
309            }
310        }
311    }
312}
313
314struct RequestHandler<'worker> {
315    query: &'worker SearchQuery,
316    fs: &'worker dyn Fs,
317    open_entries: &'worker HashSet<ProjectEntryId>,
318    matched_buffer_count: &'worker AtomicUsize,
319    matches_count: &'worker AtomicUsize,
320
321    confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
322    get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
323    publish_matches: &'worker Sender<SearchResult>,
324}
325
326struct LimitReached;
327
328impl RequestHandler<'_> {
329    async fn handle_find_all_matches(
330        &self,
331        (buffer, snapshot): (Entity<Buffer>, BufferSnapshot),
332    ) -> Option<LimitReached> {
333        let ranges = self
334            .query
335            .search(&snapshot, None)
336            .await
337            .iter()
338            .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
339            .collect::<Vec<_>>();
340
341        let matched_ranges = ranges.len();
342        if self.matched_buffer_count.fetch_add(1, Ordering::Release) > MAX_SEARCH_RESULT_FILES
343            || self
344                .matches_count
345                .fetch_add(matched_ranges, Ordering::Release)
346                > MAX_SEARCH_RESULT_RANGES
347        {
348            Some(LimitReached)
349        } else {
350            _ = self
351                .publish_matches
352                .send(SearchResult::Buffer { buffer, ranges })
353                .await;
354            None
355        }
356    }
357    async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
358        _=maybe!(async move {
359            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
360            let Some(file) = self.fs.open_sync(&abs_path).await.log_err() else {
361                return anyhow::Ok(());
362            };
363
364            let mut file = BufReader::new(file);
365            let file_start = file.fill_buf()?;
366
367            if let Err(Some(starting_position)) =
368            std::str::from_utf8(file_start).map_err(|e| e.error_len())
369            {
370                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
371                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
372                log::debug!(
373                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
374                );
375                return Ok(());
376            }
377
378            if self.query.detect(file).unwrap_or(false) {
379                // Yes, we should scan the whole file.
380                entry.should_scan_tx.send(entry.path).await?;
381            }
382            Ok(())
383        }).await;
384    }
385
386    async fn handle_scan_path(&self, req: InputPath<'_>) {
387        _ = maybe!(async move {
388            let InputPath {
389                entry,
390                settings,
391                snapshot,
392                should_scan_tx,
393            } = req;
394            if entry.is_dir() && entry.is_ignored {
395                if !settings.is_path_excluded(&entry.path) {
396                    // Self::scan_ignored_dir(
397                    //     self.fs,
398                    //     &snapshot,
399                    //     &entry.path,
400                    //     self.query,
401                    //     &filter_tx,
402                    //     &output_tx,
403                    // )
404                    // .await?;
405                }
406                return Ok(());
407            }
408
409            if entry.is_fifo || !entry.is_file() {
410                return Ok(());
411            }
412
413            if self.query.filters_path() {
414                let matched_path = if self.query.match_full_paths() {
415                    let mut full_path = snapshot.root_name().as_std_path().to_owned();
416                    full_path.push(entry.path.as_std_path());
417                    self.query.match_path(&full_path)
418                } else {
419                    self.query.match_path(entry.path.as_std_path())
420                };
421                if !matched_path {
422                    return Ok(());
423                }
424            }
425
426            if self.open_entries.contains(&entry.id) {
427                // The buffer is already in memory and that's the version we want to scan;
428                // hence skip the dilly-dally and look for all matches straight away.
429                self.get_buffer_for_full_scan_tx
430                    .send(ProjectPath {
431                        worktree_id: snapshot.id(),
432                        path: entry.path.clone(),
433                    })
434                    .await?;
435            } else {
436                self.confirm_contents_will_match_tx
437                    .send(MatchingEntry {
438                        should_scan_tx: should_scan_tx,
439                        worktree_root: snapshot.abs_path().clone(),
440                        path: ProjectPath {
441                            worktree_id: snapshot.id(),
442                            path: entry.path.clone(),
443                        },
444                    })
445                    .await?;
446            }
447
448            anyhow::Ok(())
449        })
450        .await;
451    }
452}
453
454struct InputPath<'worker> {
455    entry: &'worker Entry,
456    settings: &'worker WorktreeSettings,
457    snapshot: &'worker Snapshot,
458    should_scan_tx: oneshot::Sender<ProjectPath>,
459}
460
461struct MatchingEntry {
462    worktree_root: Arc<Path>,
463    path: ProjectPath,
464    should_scan_tx: oneshot::Sender<ProjectPath>,
465}