project_search.rs

  1use std::{
  2    io::{BufRead, BufReader},
  3    path::Path,
  4    pin::pin,
  5    sync::{
  6        Arc,
  7        atomic::{AtomicUsize, Ordering},
  8    },
  9};
 10
 11use anyhow::Context;
 12use collections::HashSet;
 13use fs::Fs;
 14use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
 15use gpui::{App, AppContext, AsyncApp, Entity, Task};
 16use language::{Buffer, BufferSnapshot};
 17use parking_lot::Mutex;
 18use postage::oneshot;
 19use rpc::{AnyProtoClient, proto};
 20use smol::{
 21    channel::{Receiver, Sender, bounded, unbounded},
 22    future::FutureExt,
 23};
 24
 25use text::BufferId;
 26use util::{ResultExt, maybe};
 27use worktree::{Entry, ProjectEntryId, Snapshot, Worktree};
 28
 29use crate::{
 30    Project, ProjectItem, ProjectPath, RemotelyCreatedModels,
 31    buffer_store::BufferStore,
 32    search::{SearchQuery, SearchResult},
 33    worktree_store::WorktreeStore,
 34};
 35
 36pub struct Search {
 37    buffer_store: Entity<BufferStore>,
 38    worktree_store: Entity<WorktreeStore>,
 39    limit: usize,
 40    kind: SearchKind,
 41}
 42
 43enum SearchKind {
 44    Local {
 45        fs: Arc<dyn Fs>,
 46        worktrees: Vec<Entity<Worktree>>,
 47    },
 48    Remote {
 49        client: AnyProtoClient,
 50        remote_id: u64,
 51        models: Arc<Mutex<RemotelyCreatedModels>>,
 52    },
 53}
 54
 55/// Represents results of project search and allows one to either obtain match positions OR
 56/// just the handles to buffers that may match the search.
 57#[must_use]
 58pub struct SearchResultsHandle {
 59    results: Receiver<SearchResult>,
 60    matching_buffers: Receiver<Entity<Buffer>>,
 61    trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
 62}
 63
 64impl SearchResultsHandle {
 65    pub fn results(self, cx: &mut App) -> Receiver<SearchResult> {
 66        (self.trigger_search)(cx).detach();
 67        self.results
 68    }
 69    pub fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
 70        (self.trigger_search)(cx).detach();
 71        self.matching_buffers
 72    }
 73}
 74
 75#[derive(Clone)]
 76enum FindSearchCandidates {
 77    Local {
 78        fs: Arc<dyn Fs>,
 79        /// Start off with all paths in project and filter them based on:
 80        /// - Include filters
 81        /// - Exclude filters
 82        /// - Only open buffers
 83        /// - Scan ignored files
 84        /// Put another way: filter out files that can't match (without looking at file contents)
 85        input_paths_rx: Receiver<InputPath>,
 86        /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
 87        /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
 88        confirm_contents_will_match_tx: Sender<MatchingEntry>,
 89        confirm_contents_will_match_rx: Receiver<MatchingEntry>,
 90        /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
 91        /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
 92        get_buffer_for_full_scan_tx: Sender<ProjectPath>,
 93    },
 94    Remote,
 95}
 96
 97impl Search {
 98    pub fn local(
 99        fs: Arc<dyn Fs>,
100        buffer_store: Entity<BufferStore>,
101        worktree_store: Entity<WorktreeStore>,
102
103        limit: usize,
104        cx: &mut App,
105    ) -> Self {
106        let worktrees = worktree_store.read(cx).visible_worktrees(cx).collect();
107        Self {
108            kind: SearchKind::Local { fs, worktrees },
109            buffer_store,
110            worktree_store,
111            limit,
112        }
113    }
114
115    pub(crate) fn remote(
116        buffer_store: Entity<BufferStore>,
117        worktree_store: Entity<WorktreeStore>,
118        limit: usize,
119        client_state: (AnyProtoClient, u64, Arc<Mutex<RemotelyCreatedModels>>),
120    ) -> Self {
121        Self {
122            kind: SearchKind::Remote {
123                client: client_state.0,
124                remote_id: client_state.1,
125                models: client_state.2,
126            },
127            buffer_store,
128            worktree_store,
129            limit,
130        }
131    }
132
133    pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000;
134    pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
135    /// Prepares a project search run. The result has to be used to specify whether you're interested in matching buffers
136    /// or full search results.
137    pub fn into_results(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
138        let mut open_buffers = HashSet::default();
139        let mut unnamed_buffers = Vec::new();
140        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
141        let buffers = self.buffer_store.read(cx);
142        for handle in buffers.buffers() {
143            let buffer = handle.read(cx);
144            if !buffers.is_searchable(&buffer.remote_id()) {
145                continue;
146            } else if let Some(entry_id) = buffer.entry_id(cx) {
147                open_buffers.insert(entry_id);
148            } else {
149                self.limit -= self.limit.saturating_sub(1);
150                unnamed_buffers.push(handle)
151            };
152        }
153        let executor = cx.background_executor().clone();
154        let (tx, rx) = unbounded();
155        let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
156        let matching_buffers = grab_buffer_snapshot_rx.clone();
157        let trigger_search = Box::new(move |cx: &mut App| {
158            cx.spawn(async move |cx| {
159                for buffer in unnamed_buffers {
160                    _ = grab_buffer_snapshot_tx.send(buffer).await;
161                }
162
163                let (find_all_matches_tx, find_all_matches_rx) =
164                    bounded(MAX_CONCURRENT_BUFFER_OPENS);
165
166                let (candidate_searcher, tasks) = match self.kind {
167                    SearchKind::Local {
168                        fs,
169                        ref mut worktrees,
170                    } => {
171                        let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
172                            unbounded();
173                        let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
174                            bounded(64);
175                        let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
176
177                        let (input_paths_tx, input_paths_rx) = unbounded();
178
179                        let tasks = vec![
180                            cx.spawn(Self::provide_search_paths(
181                                std::mem::take(worktrees),
182                                query.include_ignored(),
183                                input_paths_tx,
184                                sorted_search_results_tx,
185                            ))
186                            .boxed_local(),
187                            Self::open_buffers(
188                                &self.buffer_store,
189                                get_buffer_for_full_scan_rx,
190                                grab_buffer_snapshot_tx,
191                                cx.clone(),
192                            )
193                            .boxed_local(),
194                            cx.background_spawn(Self::maintain_sorted_search_results(
195                                sorted_search_results_rx,
196                                get_buffer_for_full_scan_tx.clone(),
197                                self.limit,
198                            ))
199                            .boxed_local(),
200                        ];
201                        (
202                            FindSearchCandidates::Local {
203                                fs,
204                                get_buffer_for_full_scan_tx,
205                                confirm_contents_will_match_tx,
206                                confirm_contents_will_match_rx,
207                                input_paths_rx,
208                            },
209                            tasks,
210                        )
211                    }
212                    SearchKind::Remote {
213                        client,
214                        remote_id,
215                        models,
216                    } => {
217                        let request = client.request(proto::FindSearchCandidates {
218                            project_id: remote_id,
219                            query: Some(query.to_proto()),
220                            limit: self.limit as _,
221                        });
222                        let Ok(guard) = cx.update(|cx| {
223                            Project::retain_remotely_created_models_impl(
224                                &models,
225                                &self.buffer_store,
226                                &self.worktree_store,
227                                cx,
228                            )
229                        }) else {
230                            return;
231                        };
232                        let buffer_store = self.buffer_store.downgrade();
233                        let issue_remote_buffers_request = cx
234                            .spawn(async move |cx| {
235                                let _ = maybe!(async move {
236                                    let response = request.await?;
237
238                                    for buffer_id in response.buffer_ids {
239                                        let buffer_id = BufferId::new(buffer_id)?;
240                                        let buffer = buffer_store
241                                            .update(cx, |buffer_store, cx| {
242                                                buffer_store.wait_for_remote_buffer(buffer_id, cx)
243                                            })?
244                                            .await?;
245                                        let _ = grab_buffer_snapshot_tx.send(buffer).await;
246                                    }
247
248                                    drop(guard);
249                                    anyhow::Ok(())
250                                })
251                                .await
252                                .log_err();
253                            })
254                            .boxed_local();
255                        (
256                            FindSearchCandidates::Remote,
257                            vec![issue_remote_buffers_request],
258                        )
259                    }
260                };
261
262                let matches_count = AtomicUsize::new(0);
263                let matched_buffer_count = AtomicUsize::new(0);
264
265                let worker_pool = executor.scoped(|scope| {
266                    let num_cpus = executor.num_cpus();
267
268                    assert!(num_cpus > 0);
269                    for _ in 0..executor.num_cpus() - 1 {
270                        let worker = Worker {
271                            query: &query,
272                            open_buffers: &open_buffers,
273                            matched_buffer_count: &matched_buffer_count,
274                            matches_count: &matches_count,
275                            candidates: candidate_searcher.clone(),
276                            find_all_matches_rx: find_all_matches_rx.clone(),
277                            publish_matches: tx.clone(),
278                        };
279                        scope.spawn(worker.run());
280                    }
281                    drop(tx);
282                    drop(find_all_matches_rx);
283                    drop(candidate_searcher);
284                });
285
286                let buffer_snapshots = Self::grab_buffer_snapshots(
287                    grab_buffer_snapshot_rx,
288                    find_all_matches_tx,
289                    cx.clone(),
290                );
291                futures::future::join_all(
292                    [worker_pool.boxed_local(), buffer_snapshots.boxed_local()]
293                        .into_iter()
294                        .chain(tasks),
295                )
296                .await;
297            })
298        });
299
300        SearchResultsHandle {
301            results: rx,
302            matching_buffers,
303            trigger_search,
304        }
305    }
306
307    fn provide_search_paths(
308        worktrees: Vec<Entity<Worktree>>,
309        include_ignored: bool,
310        tx: Sender<InputPath>,
311        results: Sender<oneshot::Receiver<ProjectPath>>,
312    ) -> impl AsyncFnOnce(&mut AsyncApp) {
313        async move |cx| {
314            _ = maybe!(async move {
315                for worktree in worktrees {
316                    let (mut snapshot, worktree_settings) = worktree
317                        .read_with(cx, |this, _| {
318                            Some((this.snapshot(), this.as_local()?.settings()))
319                        })?
320                        .context("The worktree is not local")?;
321                    if include_ignored {
322                        // Pre-fetch all of the ignored directories as they're going to be searched.
323                        let mut entries_to_refresh = vec![];
324                        for entry in snapshot.entries(include_ignored, 0) {
325                            if entry.is_ignored && entry.kind.is_unloaded() {
326                                if !worktree_settings.is_path_excluded(&entry.path) {
327                                    entries_to_refresh.push(entry.path.clone());
328                                }
329                            }
330                        }
331                        let barrier = worktree.update(cx, |this, _| {
332                            let local = this.as_local_mut()?;
333                            let barrier = entries_to_refresh
334                                .into_iter()
335                                .map(|path| local.add_path_prefix_to_scan(path).into_future())
336                                .collect::<Vec<_>>();
337                            Some(barrier)
338                        })?;
339                        if let Some(barriers) = barrier {
340                            futures::future::join_all(barriers).await;
341                        }
342                        snapshot = worktree.read_with(cx, |this, _| this.snapshot())?;
343                    }
344                    cx.background_executor()
345                        .scoped(|scope| {
346                            scope.spawn(async {
347                                for entry in snapshot.files(include_ignored, 0) {
348                                    let (should_scan_tx, should_scan_rx) = oneshot::channel();
349                                    let Ok(_) = tx
350                                        .send(InputPath {
351                                            entry: entry.clone(),
352                                            snapshot: snapshot.clone(),
353                                            should_scan_tx,
354                                        })
355                                        .await
356                                    else {
357                                        return;
358                                    };
359                                    if results.send(should_scan_rx).await.is_err() {
360                                        return;
361                                    };
362                                }
363                            })
364                        })
365                        .await;
366                }
367                anyhow::Ok(())
368            })
369            .await;
370        }
371    }
372
373    async fn maintain_sorted_search_results(
374        rx: Receiver<oneshot::Receiver<ProjectPath>>,
375        paths_for_full_scan: Sender<ProjectPath>,
376        limit: usize,
377    ) {
378        let mut rx = pin!(rx);
379        let mut matched = 0;
380        while let Some(mut next_path_result) = rx.next().await {
381            let Some(successful_path) = next_path_result.next().await else {
382                // This math did not produce a match, hence skip it.
383                continue;
384            };
385            if paths_for_full_scan.send(successful_path).await.is_err() {
386                return;
387            };
388            matched += 1;
389            if matched >= limit {
390                break;
391            }
392        }
393    }
394
395    /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
396    async fn open_buffers(
397        buffer_store: &Entity<BufferStore>,
398        rx: Receiver<ProjectPath>,
399        find_all_matches_tx: Sender<Entity<Buffer>>,
400        mut cx: AsyncApp,
401    ) {
402        let mut rx = pin!(rx.ready_chunks(64));
403        _ = maybe!(async move {
404            while let Some(requested_paths) = rx.next().await {
405                let mut buffers = buffer_store.update(&mut cx, |this, cx| {
406                    requested_paths
407                        .into_iter()
408                        .map(|path| this.open_buffer(path, cx))
409                        .collect::<FuturesOrdered<_>>()
410                })?;
411
412                while let Some(buffer) = buffers.next().await {
413                    if let Some(buffer) = buffer.log_err() {
414                        find_all_matches_tx.send(buffer).await?;
415                    }
416                }
417            }
418            Result::<_, anyhow::Error>::Ok(())
419        })
420        .await;
421    }
422
423    async fn grab_buffer_snapshots(
424        rx: Receiver<Entity<Buffer>>,
425        find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
426        mut cx: AsyncApp,
427    ) {
428        _ = maybe!(async move {
429            while let Ok(buffer) = rx.recv().await {
430                let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
431                find_all_matches_tx.send((buffer, snapshot)).await?;
432            }
433            Result::<_, anyhow::Error>::Ok(())
434        })
435        .await;
436    }
437}
438
439struct Worker<'search> {
440    query: &'search SearchQuery,
441    matched_buffer_count: &'search AtomicUsize,
442    matches_count: &'search AtomicUsize,
443    open_buffers: &'search HashSet<ProjectEntryId>,
444    candidates: FindSearchCandidates,
445    /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
446    find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
447    /// Cool, we have results; let's share them with the world.
448    publish_matches: Sender<SearchResult>,
449}
450
451impl Worker<'_> {
452    async fn run(mut self) {
453        let (
454            input_paths_rx,
455            confirm_contents_will_match_rx,
456            mut confirm_contents_will_match_tx,
457            mut get_buffer_for_full_scan_tx,
458            fs,
459        ) = match self.candidates {
460            FindSearchCandidates::Local {
461                fs,
462                input_paths_rx,
463                confirm_contents_will_match_rx,
464                confirm_contents_will_match_tx,
465                get_buffer_for_full_scan_tx,
466            } => (
467                input_paths_rx,
468                confirm_contents_will_match_rx,
469                confirm_contents_will_match_tx,
470                get_buffer_for_full_scan_tx,
471                Some(fs),
472            ),
473            FindSearchCandidates::Remote => (
474                unbounded().1,
475                unbounded().1,
476                unbounded().0,
477                unbounded().0,
478                None,
479            ),
480        };
481        let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
482        let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse());
483        let mut scan_path = pin!(input_paths_rx.fuse());
484
485        loop {
486            let handler = RequestHandler {
487                query: self.query,
488                open_entries: &self.open_buffers,
489                fs: fs.as_deref(),
490                matched_buffer_count: self.matched_buffer_count,
491                matches_count: self.matches_count,
492                confirm_contents_will_match_tx: &confirm_contents_will_match_tx,
493                get_buffer_for_full_scan_tx: &get_buffer_for_full_scan_tx,
494                publish_matches: &self.publish_matches,
495            };
496            // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
497            // steps straight away. Another worker might be about to produce a value that will
498            // be pushed there, thus we'll replace current worker's pipe with a dummy one.
499            // That way, we'll only ever close a next-stage channel when ALL workers do so.
500            select_biased! {
501                find_all_matches = find_all_matches.next() => {
502
503                    if self.publish_matches.is_closed() {
504                        break;
505                    }
506                    let Some(matches) = find_all_matches else {
507                        self.publish_matches = bounded(1).0;
508                        continue;
509                    };
510                    let result = handler.handle_find_all_matches(matches).await;
511                    if let Some(_should_bail) = result {
512
513                        self.publish_matches = bounded(1).0;
514                        continue;
515                    }
516                },
517                find_first_match = find_first_match.next() => {
518                    if let Some(buffer_with_at_least_one_match) = find_first_match {
519                        handler.handle_find_first_match(buffer_with_at_least_one_match).await;
520                    } else {
521                        get_buffer_for_full_scan_tx = bounded(1).0;
522                    }
523
524                },
525                scan_path = scan_path.next() => {
526                    if let Some(path_to_scan) = scan_path {
527                        handler.handle_scan_path(path_to_scan).await;
528                    } else {
529                        // If we're the last worker to notice that this is not producing values, close the upstream.
530                        confirm_contents_will_match_tx = bounded(1).0;
531                    }
532
533                 }
534                 complete => {
535                     break
536                },
537
538            }
539        }
540    }
541}
542
543struct RequestHandler<'worker> {
544    query: &'worker SearchQuery,
545    fs: Option<&'worker dyn Fs>,
546    open_entries: &'worker HashSet<ProjectEntryId>,
547    matched_buffer_count: &'worker AtomicUsize,
548    matches_count: &'worker AtomicUsize,
549
550    confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
551    get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
552    publish_matches: &'worker Sender<SearchResult>,
553}
554
555struct LimitReached;
556
557impl RequestHandler<'_> {
558    async fn handle_find_all_matches(
559        &self,
560        (buffer, snapshot): (Entity<Buffer>, BufferSnapshot),
561    ) -> Option<LimitReached> {
562        let ranges = self
563            .query
564            .search(&snapshot, None)
565            .await
566            .iter()
567            .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
568            .collect::<Vec<_>>();
569
570        let matched_ranges = ranges.len();
571        if self.matched_buffer_count.fetch_add(1, Ordering::Release)
572            > Search::MAX_SEARCH_RESULT_FILES
573            || self
574                .matches_count
575                .fetch_add(matched_ranges, Ordering::Release)
576                > Search::MAX_SEARCH_RESULT_RANGES
577        {
578            _ = self.publish_matches.send(SearchResult::LimitReached).await;
579            Some(LimitReached)
580        } else {
581            _ = self
582                .publish_matches
583                .send(SearchResult::Buffer { buffer, ranges })
584                .await;
585            None
586        }
587    }
588    async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
589        _=maybe!(async move {
590            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
591            let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else {
592                return anyhow::Ok(());
593            };
594
595            let mut file = BufReader::new(file);
596            let file_start = file.fill_buf()?;
597
598            if let Err(Some(starting_position)) =
599            std::str::from_utf8(file_start).map_err(|e| e.error_len())
600            {
601                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
602                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
603                log::debug!(
604                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
605                );
606                return Ok(());
607            }
608
609            if self.query.detect(file).unwrap_or(false) {
610                // Yes, we should scan the whole file.
611                entry.should_scan_tx.send(entry.path).await?;
612            }
613            Ok(())
614        }).await;
615    }
616
617    async fn handle_scan_path(&self, req: InputPath) {
618        _ = maybe!(async move {
619            let InputPath {
620                entry,
621
622                snapshot,
623                should_scan_tx,
624            } = req;
625
626            if entry.is_fifo || !entry.is_file() {
627                return Ok(());
628            }
629
630            if self.query.filters_path() {
631                let matched_path = if self.query.match_full_paths() {
632                    let mut full_path = snapshot.root_name().as_std_path().to_owned();
633                    full_path.push(entry.path.as_std_path());
634                    self.query.match_path(&full_path)
635                } else {
636                    self.query.match_path(entry.path.as_std_path())
637                };
638                if !matched_path {
639                    return Ok(());
640                }
641            }
642
643            if self.open_entries.contains(&entry.id) {
644                // The buffer is already in memory and that's the version we want to scan;
645                // hence skip the dilly-dally and look for all matches straight away.
646                self.get_buffer_for_full_scan_tx
647                    .send(ProjectPath {
648                        worktree_id: snapshot.id(),
649                        path: entry.path.clone(),
650                    })
651                    .await?;
652            } else {
653                self.confirm_contents_will_match_tx
654                    .send(MatchingEntry {
655                        should_scan_tx: should_scan_tx,
656                        worktree_root: snapshot.abs_path().clone(),
657                        path: ProjectPath {
658                            worktree_id: snapshot.id(),
659                            path: entry.path.clone(),
660                        },
661                    })
662                    .await?;
663            }
664
665            anyhow::Ok(())
666        })
667        .await;
668    }
669}
670
671struct InputPath {
672    entry: Entry,
673    snapshot: Snapshot,
674    should_scan_tx: oneshot::Sender<ProjectPath>,
675}
676
677struct MatchingEntry {
678    worktree_root: Arc<Path>,
679    path: ProjectPath,
680    should_scan_tx: oneshot::Sender<ProjectPath>,
681}