project_search.rs

  1use std::{
  2    io::{BufRead, BufReader},
  3    path::Path,
  4    pin::pin,
  5    sync::{
  6        Arc,
  7        atomic::{AtomicUsize, Ordering},
  8    },
  9};
 10
 11use collections::HashSet;
 12use fs::Fs;
 13use futures::{SinkExt, StreamExt, select_biased};
 14use gpui::{App, AsyncApp, Entity};
 15use language::{Buffer, BufferSnapshot};
 16use postage::oneshot;
 17use smol::channel::{Receiver, Sender, bounded, unbounded};
 18
 19use util::{ResultExt, maybe};
 20use worktree::{Entry, ProjectEntryId, Snapshot, WorktreeSettings};
 21
 22use crate::{
 23    ProjectItem, ProjectPath,
 24    buffer_store::BufferStore,
 25    search::{SearchQuery, SearchResult},
 26};
 27
 28pub(crate) struct ProjectSearcher {
 29    pub(crate) fs: Arc<dyn Fs>,
 30    pub(crate) buffer_store: Entity<BufferStore>,
 31    pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>,
 32    pub(crate) open_buffers: HashSet<ProjectEntryId>,
 33}
 34
 35const MAX_SEARCH_RESULT_FILES: usize = 5_000;
 36const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
 37
 38impl ProjectSearcher {
 39    pub(crate) fn run(self, query: SearchQuery, cx: &mut App) -> Receiver<SearchResult> {
 40        let mut open_buffers = HashSet::default();
 41        let mut unnamed_buffers = Vec::new();
 42
 43        let buffers = self.buffer_store.read(cx);
 44        for handle in buffers.buffers() {
 45            let buffer = handle.read(cx);
 46            if !buffers.is_searchable(&buffer.remote_id()) {
 47                continue;
 48            } else if let Some(entry_id) = buffer.entry_id(cx) {
 49                open_buffers.insert(entry_id);
 50            } else {
 51                // limit = limit.saturating_sub(1); todo!()
 52                unnamed_buffers.push(handle)
 53            };
 54        }
 55        let executor = cx.background_executor().clone();
 56        let (tx, rx) = unbounded();
 57        cx.spawn(async move |cx| {
 58            const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
 59            let (find_all_matches_tx, find_all_matches_rx) = bounded(MAX_CONCURRENT_BUFFER_OPENS);
 60            let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
 61                bounded(MAX_CONCURRENT_BUFFER_OPENS);
 62            let matches_count = AtomicUsize::new(0);
 63            let matched_buffer_count = AtomicUsize::new(0);
 64            let worker_pool = executor.scoped(|scope| {
 65                let (input_paths_tx, input_paths_rx) = bounded(64);
 66                let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) = bounded(64);
 67                let (sorted_search_results_tx, sorted_search_results_rx) = bounded(64);
 68                for _ in 0..executor.num_cpus() {
 69                    let worker = Worker {
 70                        query: &query,
 71                        open_buffers: &self.open_buffers,
 72                        matched_buffer_count: &matched_buffer_count,
 73                        matches_count: &matches_count,
 74                        fs: &*self.fs,
 75                        input_paths_rx: input_paths_rx.clone(),
 76                        confirm_contents_will_match_rx: confirm_contents_will_match_rx.clone(),
 77                        confirm_contents_will_match_tx: confirm_contents_will_match_tx.clone(),
 78                        get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(),
 79                        find_all_matches_rx: find_all_matches_rx.clone(),
 80                        publish_matches: tx.clone(),
 81                    };
 82                    scope.spawn(worker.run());
 83                }
 84                scope.spawn(self.provide_search_paths(
 85                    &query,
 86                    input_paths_tx,
 87                    sorted_search_results_tx,
 88                ));
 89                scope.spawn(self.maintain_sorted_search_results(
 90                    sorted_search_results_rx,
 91                    get_buffer_for_full_scan_tx,
 92                ))
 93            });
 94            let open_buffers =
 95                self.open_buffers(get_buffer_for_full_scan_rx, find_all_matches_tx, cx);
 96            futures::future::join(worker_pool, open_buffers).await;
 97
 98            let limit_reached = matches_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_RANGES
 99                || matched_buffer_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_FILES;
100            if limit_reached {
101                _ = tx.send(SearchResult::LimitReached).await;
102            }
103        })
104        .detach();
105        rx
106    }
107
108    async fn provide_search_paths<'this>(
109        &'this self,
110        query: &SearchQuery,
111        tx: Sender<InputPath<'this>>,
112        results: Sender<oneshot::Receiver<ProjectPath>>,
113    ) {
114        for (snapshot, worktree_settings) in &self.snapshots {
115            for entry in snapshot.entries(query.include_ignored(), 0) {
116                let (should_scan_tx, should_scan_rx) = oneshot::channel();
117                let Ok(_) = tx
118                    .send(InputPath {
119                        entry,
120                        settings: worktree_settings,
121                        snapshot: snapshot,
122                        should_scan_tx,
123                    })
124                    .await
125                else {
126                    return;
127                };
128                if results.send(should_scan_rx).await.is_err() {
129                    return;
130                };
131            }
132        }
133    }
134
135    async fn maintain_sorted_search_results(
136        &self,
137        rx: Receiver<oneshot::Receiver<ProjectPath>>,
138        paths_for_full_scan: Sender<ProjectPath>,
139    ) {
140        let mut rx = pin!(rx);
141        while let Some(mut next_path_result) = rx.next().await {
142            let Some(successful_path) = next_path_result.next().await else {
143                // This math did not produce a match, hence skip it.
144                continue;
145            };
146            if paths_for_full_scan.send(successful_path).await.is_err() {
147                return;
148            };
149        }
150    }
151
152    /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
153    async fn open_buffers<'a>(
154        &'a self,
155        rx: Receiver<ProjectPath>,
156        find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
157        cx: &mut AsyncApp,
158    ) {
159        _ = maybe!(async move {
160            while let Ok(requested_path) = rx.recv().await {
161                let Some(buffer) = self
162                    .buffer_store
163                    .update(cx, |this, cx| this.open_buffer(requested_path, cx))?
164                    .await
165                    .log_err()
166                else {
167                    continue;
168                };
169                let snapshot = buffer.read_with(cx, |this, _| this.snapshot())?;
170                find_all_matches_tx.send((buffer, snapshot)).await?;
171            }
172            Result::<_, anyhow::Error>::Ok(())
173        })
174        .await;
175    }
176}
177
178struct Worker<'search> {
179    query: &'search SearchQuery,
180    matched_buffer_count: &'search AtomicUsize,
181    matches_count: &'search AtomicUsize,
182    open_buffers: &'search HashSet<ProjectEntryId>,
183    fs: &'search dyn Fs,
184    /// Start off with all paths in project and filter them based on:
185    /// - Include filters
186    /// - Exclude filters
187    /// - Only open buffers
188    /// - Scan ignored files
189    /// Put another way: filter out files that can't match (without looking at file contents)
190    input_paths_rx: Receiver<InputPath<'search>>,
191
192    /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
193    /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
194    confirm_contents_will_match_tx: Sender<MatchingEntry>,
195    confirm_contents_will_match_rx: Receiver<MatchingEntry>,
196    /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
197    /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
198    get_buffer_for_full_scan_tx: Sender<ProjectPath>,
199    /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
200    find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
201    /// Cool, we have results; let's share them with the world.
202    publish_matches: Sender<SearchResult>,
203}
204
205impl Worker<'_> {
206    async fn run(self) {
207        let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
208        let mut find_first_match = pin!(self.confirm_contents_will_match_rx.fuse());
209        let mut scan_path = pin!(self.input_paths_rx.fuse());
210        let handler = RequestHandler {
211            query: self.query,
212            open_entries: &self.open_buffers,
213            fs: self.fs,
214            matched_buffer_count: self.matched_buffer_count,
215            matches_count: self.matches_count,
216            confirm_contents_will_match_tx: &self.confirm_contents_will_match_tx,
217            get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx,
218            publish_matches: &self.publish_matches,
219        };
220
221        loop {
222            select_biased! {
223                find_all_matches = find_all_matches.next() => {
224                    let Some(matches) = find_all_matches else {
225                        self.publish_matches.close();
226                        continue;
227                        };
228                    let result = handler.handle_find_all_matches(matches).await;
229                    if let Some(_should_bail) = result {
230                        self.publish_matches.close();
231                        break;
232                    }
233                },
234                find_first_match = find_first_match.next() => {
235                    if let Some(buffer_with_at_least_one_match) = find_first_match {
236                        handler.handle_find_first_match(buffer_with_at_least_one_match).await;
237                    } else {
238                        self.get_buffer_for_full_scan_tx.close();
239                    }
240
241                },
242                scan_path = scan_path.next() => {
243                    if let Some(path_to_scan) = scan_path {
244                        handler.handle_scan_path(path_to_scan).await;
245                    } else {
246                        self.confirm_contents_will_match_tx.close();
247                    }
248
249                 }
250                 complete => break,
251            }
252        }
253    }
254}
255
256struct RequestHandler<'worker> {
257    query: &'worker SearchQuery,
258    fs: &'worker dyn Fs,
259    open_entries: &'worker HashSet<ProjectEntryId>,
260    matched_buffer_count: &'worker AtomicUsize,
261    matches_count: &'worker AtomicUsize,
262
263    confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
264    get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
265    publish_matches: &'worker Sender<SearchResult>,
266}
267
268struct LimitReached;
269
270impl RequestHandler<'_> {
271    async fn handle_find_all_matches(
272        &self,
273        (buffer, snapshot): (Entity<Buffer>, BufferSnapshot),
274    ) -> Option<LimitReached> {
275        let ranges = self
276            .query
277            .search(&snapshot, None)
278            .await
279            .iter()
280            .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
281            .collect::<Vec<_>>();
282
283        let matched_ranges = ranges.len();
284        if self.matched_buffer_count.fetch_add(1, Ordering::Release) > MAX_SEARCH_RESULT_FILES
285            || self
286                .matches_count
287                .fetch_add(matched_ranges, Ordering::Release)
288                > MAX_SEARCH_RESULT_RANGES
289        {
290            Some(LimitReached)
291        } else {
292            _ = self
293                .publish_matches
294                .send(SearchResult::Buffer { buffer, ranges })
295                .await;
296            None
297        }
298    }
299    async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
300        _=maybe!(async move {
301            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
302            let Some(file) = self.fs.open_sync(&abs_path).await.log_err() else {
303                return anyhow::Ok(());
304            };
305
306            let mut file = BufReader::new(file);
307            let file_start = file.fill_buf()?;
308
309            if let Err(Some(starting_position)) =
310            std::str::from_utf8(file_start).map_err(|e| e.error_len())
311            {
312                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
313                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
314                log::debug!(
315                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
316                );
317                return Ok(());
318            }
319
320            if self.query.detect(file).unwrap_or(false) {
321                // Yes, we should scan the whole file.
322                entry.should_scan_tx.send(entry.path).await?;
323            }
324            Ok(())
325        }).await;
326    }
327
328    async fn handle_scan_path(&self, req: InputPath<'_>) {
329        _ = maybe!(async move {
330            let InputPath {
331                entry,
332                settings,
333                snapshot,
334                should_scan_tx,
335            } = req;
336            if entry.is_dir() && entry.is_ignored {
337                if !settings.is_path_excluded(&entry.path) {
338                    // Self::scan_ignored_dir(
339                    //     self.fs,
340                    //     &snapshot,
341                    //     &entry.path,
342                    //     self.query,
343                    //     &filter_tx,
344                    //     &output_tx,
345                    // )
346                    // .await?;
347                }
348                return Ok(());
349            }
350
351            if entry.is_fifo || !entry.is_file() {
352                return Ok(());
353            }
354
355            if self.query.filters_path() {
356                let matched_path = if self.query.match_full_paths() {
357                    let mut full_path = snapshot.root_name().as_std_path().to_owned();
358                    full_path.push(entry.path.as_std_path());
359                    self.query.match_path(&full_path)
360                } else {
361                    self.query.match_path(entry.path.as_std_path())
362                };
363                if !matched_path {
364                    return Ok(());
365                }
366            }
367
368            if self.open_entries.contains(&entry.id) {
369                // The buffer is already in memory and that's the version we want to scan;
370                // hence skip the dilly-dally and look for all matches straight away.
371                self.get_buffer_for_full_scan_tx
372                    .send(ProjectPath {
373                        worktree_id: snapshot.id(),
374                        path: entry.path.clone(),
375                    })
376                    .await?;
377            } else {
378                self.confirm_contents_will_match_tx
379                    .send(MatchingEntry {
380                        should_scan_tx: should_scan_tx,
381                        worktree_root: snapshot.abs_path().clone(),
382                        path: ProjectPath {
383                            worktree_id: snapshot.id(),
384                            path: entry.path.clone(),
385                        },
386                    })
387                    .await?;
388            }
389
390            anyhow::Ok(())
391        })
392        .await;
393    }
394}
395
396struct InputPath<'worker> {
397    entry: &'worker Entry,
398    settings: &'worker WorktreeSettings,
399    snapshot: &'worker Snapshot,
400    should_scan_tx: oneshot::Sender<ProjectPath>,
401}
402
403struct MatchingEntry {
404    worktree_root: Arc<Path>,
405    path: ProjectPath,
406    should_scan_tx: oneshot::Sender<ProjectPath>,
407}