project_search.rs

  1use std::{
  2    io::{BufRead, BufReader},
  3    path::Path,
  4    pin::pin,
  5    sync::{
  6        Arc,
  7        atomic::{AtomicUsize, Ordering},
  8    },
  9};
 10
 11use collections::HashSet;
 12use fs::Fs;
 13use futures::{SinkExt, StreamExt, select_biased};
 14use gpui::{App, AsyncApp, Entity, Task};
 15use language::{Buffer, BufferSnapshot};
 16use postage::oneshot;
 17use smol::channel::{Receiver, Sender, bounded, unbounded};
 18
 19use util::{ResultExt, maybe};
 20use worktree::{Entry, ProjectEntryId, Snapshot, WorktreeSettings};
 21
 22use crate::{
 23    ProjectItem, ProjectPath,
 24    buffer_store::BufferStore,
 25    search::{SearchQuery, SearchResult},
 26};
 27
 28pub(crate) struct Search {
 29    pub(crate) fs: Arc<dyn Fs>,
 30    pub(crate) buffer_store: Entity<BufferStore>,
 31    pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>,
 32}
 33
 34const MAX_SEARCH_RESULT_FILES: usize = 5_000;
 35const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
 36
 37/// Represents results of project search and allows one to either obtain match positions OR
 38/// just the handles to buffers that may match the search.
 39#[must_use]
 40pub(crate) struct SearchResultsHandle {
 41    results: Receiver<SearchResult>,
 42    matching_buffers: Receiver<Entity<Buffer>>,
 43    trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
 44}
 45
 46impl SearchResultsHandle {
 47    pub(crate) fn results(self, cx: &mut App) -> Receiver<SearchResult> {
 48        (self.trigger_search)(cx).detach();
 49        self.results
 50    }
 51    pub(crate) fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
 52        (self.trigger_search)(cx).detach();
 53        self.matching_buffers
 54    }
 55}
 56
 57impl Search {
 58    /// Prepares a project search run. The result has to be used to specify whether you're interested in matching buffers
 59    /// or full search results.
 60    pub(crate) fn into_results(self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
 61        let mut open_buffers = HashSet::default();
 62        let mut unnamed_buffers = Vec::new();
 63        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
 64        let buffers = self.buffer_store.read(cx);
 65        for handle in buffers.buffers() {
 66            let buffer = handle.read(cx);
 67            if !buffers.is_searchable(&buffer.remote_id()) {
 68                continue;
 69            } else if let Some(entry_id) = buffer.entry_id(cx) {
 70                open_buffers.insert(entry_id);
 71            } else {
 72                // limit = limit.saturating_sub(1); todo!()
 73                unnamed_buffers.push(handle)
 74            };
 75        }
 76        let executor = cx.background_executor().clone();
 77        let (tx, rx) = unbounded();
 78        let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
 79        let matching_buffers = grab_buffer_snapshot_rx.clone();
 80        let trigger_search = Box::new(|cx: &mut App| {
 81            cx.spawn(async move |cx| {
 82                for buffer in unnamed_buffers {
 83                    _ = grab_buffer_snapshot_tx.send(buffer).await;
 84                }
 85
 86                let (find_all_matches_tx, find_all_matches_rx) =
 87                    bounded(MAX_CONCURRENT_BUFFER_OPENS);
 88
 89                let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) = unbounded();
 90                let matches_count = AtomicUsize::new(0);
 91                let matched_buffer_count = AtomicUsize::new(0);
 92                let worker_pool = executor.scoped(|scope| {
 93                    let (input_paths_tx, input_paths_rx) = bounded(64);
 94                    let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
 95                        bounded(64);
 96                    let (sorted_search_results_tx, sorted_search_results_rx) = bounded(64);
 97                    for _ in 0..executor.num_cpus() {
 98                        let worker = Worker {
 99                            query: &query,
100                            open_buffers: &open_buffers,
101                            matched_buffer_count: &matched_buffer_count,
102                            matches_count: &matches_count,
103                            fs: &*self.fs,
104                            input_paths_rx: input_paths_rx.clone(),
105                            confirm_contents_will_match_rx: confirm_contents_will_match_rx.clone(),
106                            confirm_contents_will_match_tx: confirm_contents_will_match_tx.clone(),
107                            get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(),
108                            find_all_matches_rx: find_all_matches_rx.clone(),
109                            publish_matches: tx.clone(),
110                        };
111                        scope.spawn(worker.run());
112                    }
113                    scope.spawn(self.provide_search_paths(
114                        &query,
115                        input_paths_tx,
116                        sorted_search_results_tx,
117                    ));
118                    scope.spawn(self.maintain_sorted_search_results(
119                        sorted_search_results_rx,
120                        get_buffer_for_full_scan_tx,
121                    ))
122                });
123                let open_buffers = self.open_buffers(
124                    get_buffer_for_full_scan_rx,
125                    grab_buffer_snapshot_tx,
126                    cx.clone(),
127                );
128                let buffer_snapshots = self.grab_buffer_snapshots(
129                    grab_buffer_snapshot_rx,
130                    find_all_matches_tx,
131                    cx.clone(),
132                );
133                futures::future::join3(worker_pool, buffer_snapshots, open_buffers).await;
134
135                let limit_reached = matches_count.load(Ordering::Acquire)
136                    > MAX_SEARCH_RESULT_RANGES
137                    || matched_buffer_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_FILES;
138                if limit_reached {
139                    _ = tx.send(SearchResult::LimitReached).await;
140                }
141            })
142        });
143        SearchResultsHandle {
144            results: rx,
145            matching_buffers,
146            trigger_search,
147        }
148    }
149
150    async fn provide_search_paths<'this>(
151        &'this self,
152        query: &SearchQuery,
153        tx: Sender<InputPath<'this>>,
154        results: Sender<oneshot::Receiver<ProjectPath>>,
155    ) {
156        for (snapshot, worktree_settings) in &self.snapshots {
157            for entry in snapshot.entries(query.include_ignored(), 0) {
158                let (should_scan_tx, should_scan_rx) = oneshot::channel();
159                let Ok(_) = tx
160                    .send(InputPath {
161                        entry,
162                        settings: worktree_settings,
163                        snapshot: snapshot,
164                        should_scan_tx,
165                    })
166                    .await
167                else {
168                    return;
169                };
170                if results.send(should_scan_rx).await.is_err() {
171                    return;
172                };
173            }
174        }
175    }
176
177    async fn maintain_sorted_search_results(
178        &self,
179        rx: Receiver<oneshot::Receiver<ProjectPath>>,
180        paths_for_full_scan: Sender<ProjectPath>,
181    ) {
182        let mut rx = pin!(rx);
183        while let Some(mut next_path_result) = rx.next().await {
184            let Some(successful_path) = next_path_result.next().await else {
185                // This math did not produce a match, hence skip it.
186                continue;
187            };
188            if paths_for_full_scan.send(successful_path).await.is_err() {
189                return;
190            };
191        }
192    }
193
194    /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
195    async fn open_buffers(
196        &self,
197        rx: Receiver<ProjectPath>,
198        find_all_matches_tx: Sender<Entity<Buffer>>,
199        mut cx: AsyncApp,
200    ) {
201        _ = maybe!(async move {
202            while let Ok(requested_path) = rx.recv().await {
203                let Some(buffer) = self
204                    .buffer_store
205                    .update(&mut cx, |this, cx| this.open_buffer(requested_path, cx))?
206                    .await
207                    .log_err()
208                else {
209                    continue;
210                };
211                find_all_matches_tx.send(buffer).await?;
212            }
213            Result::<_, anyhow::Error>::Ok(())
214        })
215        .await;
216    }
217
218    async fn grab_buffer_snapshots<'a>(
219        &'a self,
220        rx: Receiver<Entity<Buffer>>,
221        find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
222        mut cx: AsyncApp,
223    ) {
224        _ = maybe!(async move {
225            while let Ok(buffer) = rx.recv().await {
226                let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
227                find_all_matches_tx.send((buffer, snapshot)).await?;
228            }
229            Result::<_, anyhow::Error>::Ok(())
230        })
231        .await;
232    }
233}
234
235struct Worker<'search> {
236    query: &'search SearchQuery,
237    matched_buffer_count: &'search AtomicUsize,
238    matches_count: &'search AtomicUsize,
239    open_buffers: &'search HashSet<ProjectEntryId>,
240    fs: &'search dyn Fs,
241    /// Start off with all paths in project and filter them based on:
242    /// - Include filters
243    /// - Exclude filters
244    /// - Only open buffers
245    /// - Scan ignored files
246    /// Put another way: filter out files that can't match (without looking at file contents)
247    input_paths_rx: Receiver<InputPath<'search>>,
248
249    /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
250    /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
251    confirm_contents_will_match_tx: Sender<MatchingEntry>,
252    confirm_contents_will_match_rx: Receiver<MatchingEntry>,
253    /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
254    /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
255    get_buffer_for_full_scan_tx: Sender<ProjectPath>,
256    /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
257    find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
258    /// Cool, we have results; let's share them with the world.
259    publish_matches: Sender<SearchResult>,
260}
261
262impl Worker<'_> {
263    async fn run(mut self) {
264        let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
265        let mut find_first_match = pin!(self.confirm_contents_will_match_rx.fuse());
266        let mut scan_path = pin!(self.input_paths_rx.fuse());
267
268        loop {
269            let handler = RequestHandler {
270                query: self.query,
271                open_entries: &self.open_buffers,
272                fs: self.fs,
273                matched_buffer_count: self.matched_buffer_count,
274                matches_count: self.matches_count,
275                confirm_contents_will_match_tx: &self.confirm_contents_will_match_tx,
276                get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx,
277                publish_matches: &self.publish_matches,
278            };
279            // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
280            // steps straight away. Another worker might be about to produce a value that will
281            // be pushed there, thus we'll replace current worker's pipe with a dummy one.
282            // That way, we'll only ever close a next-stage channel when ALL workers do so.
283            select_biased! {
284                find_all_matches = find_all_matches.next() => {
285                    let Some(matches) = find_all_matches else {
286                        self.publish_matches = bounded(1).0;
287                        continue;
288                    };
289                    let result = handler.handle_find_all_matches(matches).await;
290                    if let Some(_should_bail) = result {
291                        self.publish_matches = bounded(1).0;
292                        break;
293                    }
294                },
295                find_first_match = find_first_match.next() => {
296                    if let Some(buffer_with_at_least_one_match) = find_first_match {
297                        handler.handle_find_first_match(buffer_with_at_least_one_match).await;
298                    } else {
299                        self.get_buffer_for_full_scan_tx = bounded(1).0;
300                    }
301
302                },
303                scan_path = scan_path.next() => {
304                    if let Some(path_to_scan) = scan_path {
305                        handler.handle_scan_path(path_to_scan).await;
306                    } else {
307                        // If we're the last worker to notice that this is not producing values, close the upstream.
308                        self.confirm_contents_will_match_tx = bounded(1).0;
309                    }
310
311                 }
312                 complete => {
313                     break
314                },
315
316            }
317        }
318    }
319}
320
321struct RequestHandler<'worker> {
322    query: &'worker SearchQuery,
323    fs: &'worker dyn Fs,
324    open_entries: &'worker HashSet<ProjectEntryId>,
325    matched_buffer_count: &'worker AtomicUsize,
326    matches_count: &'worker AtomicUsize,
327
328    confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
329    get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
330    publish_matches: &'worker Sender<SearchResult>,
331}
332
333struct LimitReached;
334
335impl RequestHandler<'_> {
336    async fn handle_find_all_matches(
337        &self,
338        (buffer, snapshot): (Entity<Buffer>, BufferSnapshot),
339    ) -> Option<LimitReached> {
340        let ranges = self
341            .query
342            .search(&snapshot, None)
343            .await
344            .iter()
345            .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
346            .collect::<Vec<_>>();
347
348        let matched_ranges = ranges.len();
349        if self.matched_buffer_count.fetch_add(1, Ordering::Release) > MAX_SEARCH_RESULT_FILES
350            || self
351                .matches_count
352                .fetch_add(matched_ranges, Ordering::Release)
353                > MAX_SEARCH_RESULT_RANGES
354        {
355            Some(LimitReached)
356        } else {
357            _ = self
358                .publish_matches
359                .send(SearchResult::Buffer { buffer, ranges })
360                .await;
361            None
362        }
363    }
364    async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
365        _=maybe!(async move {
366            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
367            let Some(file) = self.fs.open_sync(&abs_path).await.log_err() else {
368                return anyhow::Ok(());
369            };
370
371            let mut file = BufReader::new(file);
372            let file_start = file.fill_buf()?;
373
374            if let Err(Some(starting_position)) =
375            std::str::from_utf8(file_start).map_err(|e| e.error_len())
376            {
377                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
378                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
379                log::debug!(
380                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
381                );
382                return Ok(());
383            }
384
385            if self.query.detect(file).unwrap_or(false) {
386                // Yes, we should scan the whole file.
387                entry.should_scan_tx.send(entry.path).await?;
388            }
389            Ok(())
390        }).await;
391    }
392
393    async fn handle_scan_path(&self, req: InputPath<'_>) {
394        _ = maybe!(async move {
395            let InputPath {
396                entry,
397                settings,
398                snapshot,
399                should_scan_tx,
400            } = req;
401            if entry.is_dir() && entry.is_ignored {
402                if !settings.is_path_excluded(&entry.path) {
403                    // Self::scan_ignored_dir(
404                    //     self.fs,
405                    //     &snapshot,
406                    //     &entry.path,
407                    //     self.query,
408                    //     &filter_tx,
409                    //     &output_tx,
410                    // )
411                    // .await?;
412                }
413                return Ok(());
414            }
415
416            if entry.is_fifo || !entry.is_file() {
417                return Ok(());
418            }
419
420            if self.query.filters_path() {
421                let matched_path = if self.query.match_full_paths() {
422                    let mut full_path = snapshot.root_name().as_std_path().to_owned();
423                    full_path.push(entry.path.as_std_path());
424                    self.query.match_path(&full_path)
425                } else {
426                    self.query.match_path(entry.path.as_std_path())
427                };
428                if !matched_path {
429                    return Ok(());
430                }
431            }
432
433            if self.open_entries.contains(&entry.id) {
434                // The buffer is already in memory and that's the version we want to scan;
435                // hence skip the dilly-dally and look for all matches straight away.
436                self.get_buffer_for_full_scan_tx
437                    .send(ProjectPath {
438                        worktree_id: snapshot.id(),
439                        path: entry.path.clone(),
440                    })
441                    .await?;
442            } else {
443                self.confirm_contents_will_match_tx
444                    .send(MatchingEntry {
445                        should_scan_tx: should_scan_tx,
446                        worktree_root: snapshot.abs_path().clone(),
447                        path: ProjectPath {
448                            worktree_id: snapshot.id(),
449                            path: entry.path.clone(),
450                        },
451                    })
452                    .await?;
453            }
454
455            anyhow::Ok(())
456        })
457        .await;
458    }
459}
460
461struct InputPath<'worker> {
462    entry: &'worker Entry,
463    settings: &'worker WorktreeSettings,
464    snapshot: &'worker Snapshot,
465    should_scan_tx: oneshot::Sender<ProjectPath>,
466}
467
468struct MatchingEntry {
469    worktree_root: Arc<Path>,
470    path: ProjectPath,
471    should_scan_tx: oneshot::Sender<ProjectPath>,
472}