project_search.rs

   1use std::{
   2    cell::LazyCell,
   3    collections::BTreeSet,
   4    io::{BufRead, BufReader},
   5    ops::Range,
   6    path::{Path, PathBuf},
   7    pin::pin,
   8    sync::Arc,
   9};
  10
  11use anyhow::Context;
  12use collections::HashSet;
  13use fs::Fs;
  14use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
  15use gpui::{App, AppContext, AsyncApp, Entity, Task};
  16use language::{Buffer, BufferSnapshot};
  17use parking_lot::Mutex;
  18use postage::oneshot;
  19use rpc::{AnyProtoClient, proto};
  20use smol::{
  21    channel::{Receiver, Sender, bounded, unbounded},
  22    future::FutureExt,
  23};
  24
  25use util::{ResultExt, maybe, paths::compare_rel_paths, rel_path::RelPath};
  26use worktree::{Entry, ProjectEntryId, Snapshot, Worktree, WorktreeSettings};
  27
  28use crate::{
  29    Project, ProjectItem, ProjectPath, RemotelyCreatedModels,
  30    buffer_store::BufferStore,
  31    search::{SearchQuery, SearchResult},
  32    worktree_store::WorktreeStore,
  33};
  34
  35pub struct Search {
  36    buffer_store: Entity<BufferStore>,
  37    worktree_store: Entity<WorktreeStore>,
  38    limit: usize,
  39    kind: SearchKind,
  40}
  41
  42/// Represents search setup, before it is actually kicked off with Search::into_results
  43enum SearchKind {
  44    /// Search for candidates by inspecting file contents on file system, avoiding loading the buffer unless we know that a given file contains a match.
  45    Local {
  46        fs: Arc<dyn Fs>,
  47        worktrees: Vec<Entity<Worktree>>,
  48    },
  49    /// Query remote host for candidates. As of writing, the host runs a local search in "buffers with matches only" mode.
  50    Remote {
  51        client: AnyProtoClient,
  52        remote_id: u64,
  53        models: Arc<Mutex<RemotelyCreatedModels>>,
  54    },
  55    /// Run search against a known set of candidates. Even when working with a remote host, this won't round-trip to host.
  56    OpenBuffersOnly,
  57}
  58
  59/// Represents results of project search and allows one to either obtain match positions OR
  60/// just the handles to buffers that may match the search. Grabbing the handles is cheaper than obtaining full match positions, because in that case we'll look for
  61/// at most one match in each file.
  62#[must_use]
  63pub struct SearchResultsHandle {
  64    results: Receiver<SearchResult>,
  65    matching_buffers: Receiver<Entity<Buffer>>,
  66    trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
  67}
  68
  69impl SearchResultsHandle {
  70    pub fn results(self, cx: &mut App) -> Receiver<SearchResult> {
  71        (self.trigger_search)(cx).detach();
  72        self.results
  73    }
  74    pub fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
  75        (self.trigger_search)(cx).detach();
  76        self.matching_buffers
  77    }
  78}
  79
  80#[derive(Clone)]
  81enum FindSearchCandidates {
  82    Local {
  83        fs: Arc<dyn Fs>,
  84        /// Start off with all paths in project and filter them based on:
  85        /// - Include filters
  86        /// - Exclude filters
  87        /// - Only open buffers
  88        /// - Scan ignored files
  89        /// Put another way: filter out files that can't match (without looking at file contents)
  90        input_paths_rx: Receiver<InputPath>,
  91        /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
  92        /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
  93        confirm_contents_will_match_tx: Sender<MatchingEntry>,
  94        confirm_contents_will_match_rx: Receiver<MatchingEntry>,
  95    },
  96    Remote,
  97    OpenBuffersOnly,
  98}
  99
 100impl Search {
 101    pub fn local(
 102        fs: Arc<dyn Fs>,
 103        buffer_store: Entity<BufferStore>,
 104        worktree_store: Entity<WorktreeStore>,
 105        limit: usize,
 106        cx: &mut App,
 107    ) -> Self {
 108        let worktrees = worktree_store.read(cx).visible_worktrees(cx).collect();
 109        Self {
 110            kind: SearchKind::Local { fs, worktrees },
 111            buffer_store,
 112            worktree_store,
 113            limit,
 114        }
 115    }
 116
 117    pub(crate) fn remote(
 118        buffer_store: Entity<BufferStore>,
 119        worktree_store: Entity<WorktreeStore>,
 120        limit: usize,
 121        client_state: (AnyProtoClient, u64, Arc<Mutex<RemotelyCreatedModels>>),
 122    ) -> Self {
 123        Self {
 124            kind: SearchKind::Remote {
 125                client: client_state.0,
 126                remote_id: client_state.1,
 127                models: client_state.2,
 128            },
 129            buffer_store,
 130            worktree_store,
 131            limit,
 132        }
 133    }
 134    pub(crate) fn open_buffers_only(
 135        buffer_store: Entity<BufferStore>,
 136        worktree_store: Entity<WorktreeStore>,
 137        limit: usize,
 138    ) -> Self {
 139        Self {
 140            kind: SearchKind::OpenBuffersOnly,
 141            buffer_store,
 142            worktree_store,
 143            limit,
 144        }
 145    }
 146
 147    pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000;
 148    pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
 149    /// Prepares a project search run. The resulting [`SearchResultsHandle`] has to be used to specify whether you're interested in matching buffers
 150    /// or full search results.
 151    pub fn into_handle(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
 152        let mut open_buffers = HashSet::default();
 153        let mut unnamed_buffers = Vec::new();
 154        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
 155        let buffers = self.buffer_store.read(cx);
 156        for handle in buffers.buffers() {
 157            let buffer = handle.read(cx);
 158            if !buffers.is_searchable(&buffer.remote_id()) {
 159                continue;
 160            } else if let Some(entry_id) = buffer.entry_id(cx) {
 161                open_buffers.insert(entry_id);
 162            } else {
 163                self.limit = self.limit.saturating_sub(1);
 164                unnamed_buffers.push(handle)
 165            };
 166        }
 167        let executor = cx.background_executor().clone();
 168        let (tx, rx) = unbounded();
 169        let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
 170        let matching_buffers = grab_buffer_snapshot_rx.clone();
 171        let trigger_search = Box::new(move |cx: &mut App| {
 172            cx.spawn(async move |cx| {
 173                for buffer in unnamed_buffers {
 174                    _ = grab_buffer_snapshot_tx.send(buffer).await;
 175                }
 176
 177                let (find_all_matches_tx, find_all_matches_rx) =
 178                    bounded(MAX_CONCURRENT_BUFFER_OPENS);
 179                let query = Arc::new(query);
 180                let (candidate_searcher, tasks) = match self.kind {
 181                    SearchKind::OpenBuffersOnly => {
 182                        let Ok(open_buffers) = cx.update(|cx| self.all_loaded_buffers(&query, cx))
 183                        else {
 184                            return;
 185                        };
 186                        let fill_requests = cx
 187                            .background_spawn(async move {
 188                                for buffer in open_buffers {
 189                                    if let Err(_) = grab_buffer_snapshot_tx.send(buffer).await {
 190                                        return;
 191                                    }
 192                                }
 193                            })
 194                            .boxed_local();
 195                        (FindSearchCandidates::OpenBuffersOnly, vec![fill_requests])
 196                    }
 197                    SearchKind::Local {
 198                        fs,
 199                        ref mut worktrees,
 200                    } => {
 201                        let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
 202                            unbounded();
 203                        let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
 204                            bounded(64);
 205                        let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
 206
 207                        let (input_paths_tx, input_paths_rx) = unbounded();
 208                        let tasks = vec![
 209                            cx.spawn(Self::provide_search_paths(
 210                                std::mem::take(worktrees),
 211                                query.clone(),
 212                                input_paths_tx,
 213                                sorted_search_results_tx,
 214                            ))
 215                            .boxed_local(),
 216                            Self::open_buffers(
 217                                &self.buffer_store,
 218                                get_buffer_for_full_scan_rx,
 219                                grab_buffer_snapshot_tx,
 220                                cx.clone(),
 221                            )
 222                            .boxed_local(),
 223                            cx.background_spawn(Self::maintain_sorted_search_results(
 224                                sorted_search_results_rx,
 225                                get_buffer_for_full_scan_tx,
 226                                self.limit,
 227                            ))
 228                            .boxed_local(),
 229                        ];
 230                        (
 231                            FindSearchCandidates::Local {
 232                                fs,
 233                                confirm_contents_will_match_tx,
 234                                confirm_contents_will_match_rx,
 235                                input_paths_rx,
 236                            },
 237                            tasks,
 238                        )
 239                    }
 240                    SearchKind::Remote {
 241                        client,
 242                        remote_id,
 243                        models,
 244                    } => {
 245                        let request = client.request(proto::FindSearchCandidates {
 246                            project_id: remote_id,
 247                            query: Some(query.to_proto()),
 248                            limit: self.limit as _,
 249                        });
 250                        let Ok(guard) = cx.update(|cx| {
 251                            Project::retain_remotely_created_models_impl(
 252                                &models,
 253                                &self.buffer_store,
 254                                &self.worktree_store,
 255                                cx,
 256                            )
 257                        }) else {
 258                            return;
 259                        };
 260                        let buffer_store = self.buffer_store.downgrade();
 261                        let issue_remote_buffers_request = cx
 262                            .spawn(async move |cx| {
 263                                let _ = maybe!(async move {
 264                                    let response = request.await?;
 265                                    let (tx, rx) = unbounded();
 266                                    buffer_store.update(cx, |this, _| {
 267                                        this.register_project_search_result_handle(
 268                                            response.handle,
 269                                            tx,
 270                                        );
 271                                    })?;
 272
 273                                    while let Ok(buffer_id) = rx.recv().await {
 274                                        let buffer = buffer_store
 275                                            .update(cx, |buffer_store, cx| {
 276                                                buffer_store.wait_for_remote_buffer(buffer_id, cx)
 277                                            })?
 278                                            .await?;
 279                                        let _ = grab_buffer_snapshot_tx.send(buffer).await;
 280                                    }
 281
 282                                    drop(guard);
 283                                    anyhow::Ok(())
 284                                })
 285                                .await
 286                                .log_err();
 287                            })
 288                            .boxed_local();
 289                        (
 290                            FindSearchCandidates::Remote,
 291                            vec![issue_remote_buffers_request],
 292                        )
 293                    }
 294                };
 295
 296                let should_find_all_matches = !tx.is_closed();
 297
 298                let worker_pool = executor.scoped(|scope| {
 299                    let num_cpus = executor.num_cpus();
 300
 301                    assert!(num_cpus > 0);
 302                    for _ in 0..executor.num_cpus() - 1 {
 303                        let worker = Worker {
 304                            query: &query,
 305                            open_buffers: &open_buffers,
 306                            candidates: candidate_searcher.clone(),
 307                            find_all_matches_rx: find_all_matches_rx.clone(),
 308                        };
 309                        scope.spawn(worker.run());
 310                    }
 311
 312                    drop(find_all_matches_rx);
 313                    drop(candidate_searcher);
 314                });
 315
 316                let (sorted_matches_tx, sorted_matches_rx) = unbounded();
 317                // The caller of `into_handle` decides whether they're interested in all matches (files that matched + all matching ranges) or
 318                // just the files. *They are using the same stream as the guts of the project search do*.
 319                // This means that we cannot grab values off of that stream unless it's strictly needed for making a progress in project search.
 320                //
 321                // Grabbing buffer snapshots is only necessary when we're looking for all matches. If the caller decided that they're not interested
 322                // in all matches, running that task unconditionally would hinder caller's ability to observe all matching file paths.
 323                let buffer_snapshots = if should_find_all_matches {
 324                    Some(
 325                        Self::grab_buffer_snapshots(
 326                            grab_buffer_snapshot_rx,
 327                            find_all_matches_tx,
 328                            sorted_matches_tx,
 329                            cx.clone(),
 330                        )
 331                        .boxed_local(),
 332                    )
 333                } else {
 334                    drop(find_all_matches_tx);
 335
 336                    None
 337                };
 338                let ensure_matches_are_reported_in_order = if should_find_all_matches {
 339                    Some(
 340                        Self::ensure_matched_ranges_are_reported_in_order(sorted_matches_rx, tx)
 341                            .boxed_local(),
 342                    )
 343                } else {
 344                    drop(tx);
 345                    None
 346                };
 347
 348                futures::future::join_all(
 349                    [worker_pool.boxed_local()]
 350                        .into_iter()
 351                        .chain(buffer_snapshots)
 352                        .chain(ensure_matches_are_reported_in_order)
 353                        .chain(tasks),
 354                )
 355                .await;
 356            })
 357        });
 358
 359        SearchResultsHandle {
 360            results: rx,
 361            matching_buffers,
 362            trigger_search,
 363        }
 364    }
 365
 366    fn provide_search_paths(
 367        worktrees: Vec<Entity<Worktree>>,
 368        query: Arc<SearchQuery>,
 369        tx: Sender<InputPath>,
 370        results: Sender<oneshot::Receiver<ProjectPath>>,
 371    ) -> impl AsyncFnOnce(&mut AsyncApp) {
 372        async move |cx| {
 373            _ = maybe!(async move {
 374                let gitignored_tracker = PathInclusionMatcher::new(query.clone());
 375                for worktree in worktrees {
 376                    let (mut snapshot, worktree_settings) = worktree
 377                        .read_with(cx, |this, _| {
 378                            Some((this.snapshot(), this.as_local()?.settings()))
 379                        })?
 380                        .context("The worktree is not local")?;
 381                    if query.include_ignored() {
 382                        // Pre-fetch all of the ignored directories as they're going to be searched.
 383                        let mut entries_to_refresh = vec![];
 384
 385                        for entry in snapshot.entries(query.include_ignored(), 0) {
 386                            if gitignored_tracker.should_scan_gitignored_dir(
 387                                entry,
 388                                &snapshot,
 389                                &worktree_settings,
 390                            ) {
 391                                entries_to_refresh.push(entry.path.clone());
 392                            }
 393                        }
 394                        let barrier = worktree.update(cx, |this, _| {
 395                            let local = this.as_local_mut()?;
 396                            let barrier = entries_to_refresh
 397                                .into_iter()
 398                                .map(|path| local.add_path_prefix_to_scan(path).into_future())
 399                                .collect::<Vec<_>>();
 400                            Some(barrier)
 401                        })?;
 402                        if let Some(barriers) = barrier {
 403                            futures::future::join_all(barriers).await;
 404                        }
 405                        snapshot = worktree.read_with(cx, |this, _| this.snapshot())?;
 406                    }
 407                    cx.background_executor()
 408                        .scoped(|scope| {
 409                            scope.spawn(async {
 410                                for entry in snapshot.files(query.include_ignored(), 0) {
 411                                    let (should_scan_tx, should_scan_rx) = oneshot::channel();
 412
 413                                    let Ok(_) = tx
 414                                        .send(InputPath {
 415                                            entry: entry.clone(),
 416                                            snapshot: snapshot.clone(),
 417                                            should_scan_tx,
 418                                        })
 419                                        .await
 420                                    else {
 421                                        return;
 422                                    };
 423                                    if results.send(should_scan_rx).await.is_err() {
 424                                        return;
 425                                    };
 426                                }
 427                            })
 428                        })
 429                        .await;
 430                }
 431                anyhow::Ok(())
 432            })
 433            .await;
 434        }
 435    }
 436
 437    async fn maintain_sorted_search_results(
 438        rx: Receiver<oneshot::Receiver<ProjectPath>>,
 439        paths_for_full_scan: Sender<ProjectPath>,
 440        limit: usize,
 441    ) {
 442        let mut rx = pin!(rx);
 443        let mut matched = 0;
 444        while let Some(mut next_path_result) = rx.next().await {
 445            let Some(successful_path) = next_path_result.next().await else {
 446                // This file did not produce a match, hence skip it.
 447                continue;
 448            };
 449            if paths_for_full_scan.send(successful_path).await.is_err() {
 450                return;
 451            };
 452            matched += 1;
 453            if matched >= limit {
 454                break;
 455            }
 456        }
 457    }
 458
 459    /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
 460    async fn open_buffers(
 461        buffer_store: &Entity<BufferStore>,
 462        rx: Receiver<ProjectPath>,
 463        find_all_matches_tx: Sender<Entity<Buffer>>,
 464        mut cx: AsyncApp,
 465    ) {
 466        let mut rx = pin!(rx.ready_chunks(64));
 467        _ = maybe!(async move {
 468            while let Some(requested_paths) = rx.next().await {
 469                let mut buffers = buffer_store.update(&mut cx, |this, cx| {
 470                    requested_paths
 471                        .into_iter()
 472                        .map(|path| this.open_buffer(path, cx))
 473                        .collect::<FuturesOrdered<_>>()
 474                })?;
 475
 476                while let Some(buffer) = buffers.next().await {
 477                    if let Some(buffer) = buffer.log_err() {
 478                        find_all_matches_tx.send(buffer).await?;
 479                    }
 480                }
 481            }
 482            Result::<_, anyhow::Error>::Ok(())
 483        })
 484        .await;
 485    }
 486
 487    async fn grab_buffer_snapshots(
 488        rx: Receiver<Entity<Buffer>>,
 489        find_all_matches_tx: Sender<(
 490            Entity<Buffer>,
 491            BufferSnapshot,
 492            oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
 493        )>,
 494        results: Sender<oneshot::Receiver<(Entity<Buffer>, Vec<Range<language::Anchor>>)>>,
 495        mut cx: AsyncApp,
 496    ) {
 497        _ = maybe!(async move {
 498            while let Ok(buffer) = rx.recv().await {
 499                let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
 500                let (tx, rx) = oneshot::channel();
 501                find_all_matches_tx.send((buffer, snapshot, tx)).await?;
 502                results.send(rx).await?;
 503            }
 504            debug_assert!(rx.is_empty());
 505            Result::<_, anyhow::Error>::Ok(())
 506        })
 507        .await;
 508    }
 509
 510    async fn ensure_matched_ranges_are_reported_in_order(
 511        rx: Receiver<oneshot::Receiver<(Entity<Buffer>, Vec<Range<language::Anchor>>)>>,
 512        tx: Sender<SearchResult>,
 513    ) {
 514        use postage::stream::Stream;
 515        _ = maybe!(async move {
 516            let mut matched_buffers = 0;
 517            let mut matches = 0;
 518            while let Ok(mut next_buffer_matches) = rx.recv().await {
 519                let Some((buffer, ranges)) = next_buffer_matches.recv().await else {
 520                    continue;
 521                };
 522
 523                if matched_buffers > Search::MAX_SEARCH_RESULT_FILES
 524                    || matches > Search::MAX_SEARCH_RESULT_RANGES
 525                {
 526                    _ = tx.send(SearchResult::LimitReached).await;
 527                    break;
 528                }
 529                matched_buffers += 1;
 530                matches += ranges.len();
 531
 532                _ = tx.send(SearchResult::Buffer { buffer, ranges }).await?;
 533            }
 534            anyhow::Ok(())
 535        })
 536        .await;
 537    }
 538
 539    fn all_loaded_buffers(&self, search_query: &SearchQuery, cx: &App) -> Vec<Entity<Buffer>> {
 540        let worktree_store = self.worktree_store.read(cx);
 541        let mut buffers = search_query
 542            .buffers()
 543            .into_iter()
 544            .flatten()
 545            .filter(|buffer| {
 546                let b = buffer.read(cx);
 547                if let Some(file) = b.file() {
 548                    if !search_query.match_path(file.path()) {
 549                        return false;
 550                    }
 551                    if !search_query.include_ignored()
 552                        && let Some(entry) = b
 553                            .entry_id(cx)
 554                            .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx))
 555                        && entry.is_ignored
 556                    {
 557                        return false;
 558                    }
 559                }
 560                true
 561            })
 562            .cloned()
 563            .collect::<Vec<_>>();
 564        buffers.sort_by(|a, b| {
 565            let a = a.read(cx);
 566            let b = b.read(cx);
 567            match (a.file(), b.file()) {
 568                (None, None) => a.remote_id().cmp(&b.remote_id()),
 569                (None, Some(_)) => std::cmp::Ordering::Less,
 570                (Some(_), None) => std::cmp::Ordering::Greater,
 571                (Some(a), Some(b)) => compare_rel_paths((a.path(), true), (b.path(), true)),
 572            }
 573        });
 574
 575        buffers
 576    }
 577}
 578
 579struct Worker<'search> {
 580    query: &'search SearchQuery,
 581    open_buffers: &'search HashSet<ProjectEntryId>,
 582    candidates: FindSearchCandidates,
 583    /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
 584    /// Then, when you're done, share them via the channel you were given.
 585    find_all_matches_rx: Receiver<(
 586        Entity<Buffer>,
 587        BufferSnapshot,
 588        oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
 589    )>,
 590}
 591
 592impl Worker<'_> {
 593    async fn run(self) {
 594        let (
 595            input_paths_rx,
 596            confirm_contents_will_match_rx,
 597            mut confirm_contents_will_match_tx,
 598            fs,
 599        ) = match self.candidates {
 600            FindSearchCandidates::Local {
 601                fs,
 602                input_paths_rx,
 603                confirm_contents_will_match_rx,
 604                confirm_contents_will_match_tx,
 605            } => (
 606                input_paths_rx,
 607                confirm_contents_will_match_rx,
 608                confirm_contents_will_match_tx,
 609                Some(fs),
 610            ),
 611            FindSearchCandidates::Remote | FindSearchCandidates::OpenBuffersOnly => {
 612                (unbounded().1, unbounded().1, unbounded().0, None)
 613            }
 614        };
 615        // WorkerA: grabs a request for "find all matches in file/a" <- takes 5 minutes
 616        // right after: WorkerB: grabs a request for "find all matches in file/b" <- takes 5 seconds
 617        let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
 618        let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse());
 619        let mut scan_path = pin!(input_paths_rx.fuse());
 620
 621        loop {
 622            let handler = RequestHandler {
 623                query: self.query,
 624                open_entries: &self.open_buffers,
 625                fs: fs.as_deref(),
 626                confirm_contents_will_match_tx: &confirm_contents_will_match_tx,
 627            };
 628            // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
 629            // steps straight away. Another worker might be about to produce a value that will
 630            // be pushed there, thus we'll replace current worker's pipe with a dummy one.
 631            // That way, we'll only ever close a next-stage channel when ALL workers do so.
 632            select_biased! {
 633                find_all_matches = find_all_matches.next() => {
 634                    let Some(matches) = find_all_matches else {
 635                        continue;
 636                    };
 637                    handler.handle_find_all_matches(matches).await;
 638                },
 639                find_first_match = find_first_match.next() => {
 640                    if let Some(buffer_with_at_least_one_match) = find_first_match {
 641                        handler.handle_find_first_match(buffer_with_at_least_one_match).await;
 642                    }
 643                },
 644                scan_path = scan_path.next() => {
 645                    if let Some(path_to_scan) = scan_path {
 646                        handler.handle_scan_path(path_to_scan).await;
 647                    } else {
 648                        // If we're the last worker to notice that this is not producing values, close the upstream.
 649                        confirm_contents_will_match_tx = bounded(1).0;
 650                    }
 651
 652                 }
 653                 complete => {
 654                     break
 655                },
 656
 657            }
 658        }
 659    }
 660}
 661
 662struct RequestHandler<'worker> {
 663    query: &'worker SearchQuery,
 664    fs: Option<&'worker dyn Fs>,
 665    open_entries: &'worker HashSet<ProjectEntryId>,
 666    confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
 667}
 668
 669impl RequestHandler<'_> {
 670    async fn handle_find_all_matches(
 671        &self,
 672        (buffer, snapshot, mut report_matches): (
 673            Entity<Buffer>,
 674            BufferSnapshot,
 675            oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
 676        ),
 677    ) {
 678        let ranges = self
 679            .query
 680            .search(&snapshot, None)
 681            .await
 682            .iter()
 683            .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
 684            .collect::<Vec<_>>();
 685
 686        _ = report_matches.send((buffer, ranges)).await;
 687    }
 688
 689    async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
 690        _=maybe!(async move {
 691            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
 692            let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else {
 693                return anyhow::Ok(());
 694            };
 695
 696            let mut file = BufReader::new(file);
 697            let file_start = file.fill_buf()?;
 698
 699            if let Err(Some(starting_position)) =
 700            std::str::from_utf8(file_start).map_err(|e| e.error_len())
 701            {
 702                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
 703                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
 704                log::debug!(
 705                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
 706                );
 707                return Ok(());
 708            }
 709
 710            if self.query.detect(file).unwrap_or(false) {
 711                // Yes, we should scan the whole file.
 712                entry.should_scan_tx.send(entry.path).await?;
 713            }
 714            Ok(())
 715        }).await;
 716    }
 717
 718    async fn handle_scan_path(&self, req: InputPath) {
 719        _ = maybe!(async move {
 720            let InputPath {
 721                entry,
 722                snapshot,
 723                mut should_scan_tx,
 724            } = req;
 725
 726            if entry.is_fifo || !entry.is_file() {
 727                return Ok(());
 728            }
 729
 730            if self.query.filters_path() {
 731                let matched_path = if self.query.match_full_paths() {
 732                    let mut full_path = snapshot.root_name().to_owned();
 733                    full_path.push(&entry.path);
 734                    self.query.match_path(&full_path)
 735                } else {
 736                    self.query.match_path(&entry.path)
 737                };
 738                if !matched_path {
 739                    return Ok(());
 740                }
 741            }
 742
 743            if self.open_entries.contains(&entry.id) {
 744                // The buffer is already in memory and that's the version we want to scan;
 745                // hence skip the dilly-dally and look for all matches straight away.
 746                should_scan_tx
 747                    .send(ProjectPath {
 748                        worktree_id: snapshot.id(),
 749                        path: entry.path.clone(),
 750                    })
 751                    .await?;
 752            } else {
 753                self.confirm_contents_will_match_tx
 754                    .send(MatchingEntry {
 755                        should_scan_tx: should_scan_tx,
 756                        worktree_root: snapshot.abs_path().clone(),
 757                        path: ProjectPath {
 758                            worktree_id: snapshot.id(),
 759                            path: entry.path.clone(),
 760                        },
 761                    })
 762                    .await?;
 763            }
 764
 765            anyhow::Ok(())
 766        })
 767        .await;
 768    }
 769}
 770
 771struct InputPath {
 772    entry: Entry,
 773    snapshot: Snapshot,
 774    should_scan_tx: oneshot::Sender<ProjectPath>,
 775}
 776
 777struct MatchingEntry {
 778    worktree_root: Arc<Path>,
 779    path: ProjectPath,
 780    should_scan_tx: oneshot::Sender<ProjectPath>,
 781}
 782
 783/// This struct encapsulates the logic to decide whether a given gitignored directory should be
 784/// scanned based on include/exclude patterns of a search query (as include/exclude parameters may match paths inside it).
 785/// It is kind-of doing an inverse of glob. Given a glob pattern like `src/**/` and a parent path like `src`, we need to decide whether the parent
 786/// may contain glob hits.
 787struct PathInclusionMatcher {
 788    included: BTreeSet<PathBuf>,
 789    query: Arc<SearchQuery>,
 790}
 791
 792impl PathInclusionMatcher {
 793    fn new(query: Arc<SearchQuery>) -> Self {
 794        let mut included = BTreeSet::new();
 795        // To do an inverse glob match, we split each glob into it's prefix and the glob part.
 796        // For example, `src/**/*.rs` becomes `src/` and `**/*.rs`. The glob part gets dropped.
 797        // Then, when checking whether a given directory should be scanned, we check whether it is a non-empty substring of any glob prefix.
 798        if query.filters_path() {
 799            included.extend(
 800                query
 801                    .files_to_include()
 802                    .sources()
 803                    .flat_map(|glob| Some(wax::Glob::new(glob).ok()?.partition().0)),
 804            );
 805        }
 806        Self { included, query }
 807    }
 808
 809    fn should_scan_gitignored_dir(
 810        &self,
 811        entry: &Entry,
 812        snapshot: &Snapshot,
 813        worktree_settings: &WorktreeSettings,
 814    ) -> bool {
 815        if !entry.is_ignored || !entry.kind.is_unloaded() {
 816            return false;
 817        }
 818        if !self.query.include_ignored() {
 819            return false;
 820        }
 821        if worktree_settings.is_path_excluded(&entry.path) {
 822            return false;
 823        }
 824        if !self.query.filters_path() {
 825            return true;
 826        }
 827
 828        let as_abs_path = LazyCell::new(move || snapshot.absolutize(&entry.path));
 829        let entry_path = &entry.path;
 830        // 3. Check Exclusions (Pruning)
 831        // If the current path is a child of an excluded path, we stop.
 832        let is_excluded = self.path_is_definitely_excluded(&entry_path, snapshot);
 833
 834        if is_excluded {
 835            return false;
 836        }
 837
 838        // 4. Check Inclusions (Traversal)
 839        if self.included.is_empty() {
 840            return true;
 841        }
 842
 843        // We scan if the current path is a descendant of an include prefix
 844        // OR if the current path is an ancestor of an include prefix (we need to go deeper to find it).
 845        let is_included = self.included.iter().any(|prefix| {
 846            let (prefix_matches_entry, entry_matches_prefix) = if prefix.is_absolute() {
 847                (
 848                    prefix.starts_with(&**as_abs_path),
 849                    as_abs_path.starts_with(prefix),
 850                )
 851            } else {
 852                RelPath::new(prefix, snapshot.path_style()).map_or((false, false), |prefix| {
 853                    (
 854                        prefix.starts_with(entry_path),
 855                        entry_path.starts_with(&prefix),
 856                    )
 857                })
 858            };
 859
 860            // Logic:
 861            // 1. entry_matches_prefix: We are inside the target zone (e.g. glob: src/, current: src/lib/). Keep scanning.
 862            // 2. prefix_matches_entry: We are above the target zone (e.g. glob: src/foo/, current: src/). Keep scanning to reach foo.
 863            prefix_matches_entry || entry_matches_prefix
 864        });
 865
 866        is_included
 867    }
 868    fn path_is_definitely_excluded(&self, path: &RelPath, snapshot: &Snapshot) -> bool {
 869        if !self.query.files_to_exclude().sources().next().is_none() {
 870            let mut path = if self.query.match_full_paths() {
 871                let mut full_path = snapshot.root_name().to_owned();
 872                full_path.push(path);
 873                full_path
 874            } else {
 875                path.to_owned()
 876            };
 877            loop {
 878                if self.query.files_to_exclude().is_match(&path) {
 879                    return true;
 880                } else if !path.pop() {
 881                    return false;
 882                }
 883            }
 884        } else {
 885            false
 886        }
 887    }
 888}
 889
 890#[cfg(test)]
 891mod tests {
 892    use super::*;
 893    use fs::FakeFs;
 894    use serde_json::json;
 895    use settings::Settings;
 896    use util::{
 897        path,
 898        paths::{PathMatcher, PathStyle},
 899        rel_path::RelPath,
 900    };
 901    use worktree::{Entry, EntryKind, WorktreeSettings};
 902
 903    use crate::{
 904        Project, project_search::PathInclusionMatcher, project_tests::init_test,
 905        search::SearchQuery,
 906    };
 907
 908    #[gpui::test]
 909    async fn test_path_inclusion_matcher(cx: &mut gpui::TestAppContext) {
 910        init_test(cx);
 911
 912        let fs = FakeFs::new(cx.background_executor.clone());
 913        fs.insert_tree(
 914            "/root",
 915            json!({
 916                ".gitignore": "src/data/\n",
 917                "src": {
 918                    "data": {
 919                        "main.csv": "field_1,field_2,field_3",
 920                    },
 921                    "lib": {
 922                        "main.txt": "Are you familiar with fields?",
 923                    },
 924                },
 925            }),
 926        )
 927        .await;
 928
 929        let project = Project::test(fs.clone(), [path!("/root").as_ref()], cx).await;
 930        let worktree = project.update(cx, |project, cx| project.worktrees(cx).next().unwrap());
 931        let (worktree_settings, worktree_snapshot) = worktree.update(cx, |worktree, cx| {
 932            let settings_location = worktree.settings_location(cx);
 933            return (
 934                WorktreeSettings::get(Some(settings_location), cx).clone(),
 935                worktree.snapshot(),
 936            );
 937        });
 938
 939        // Manually create a test entry for the gitignored directory since it won't
 940        // be loaded by the worktree
 941        let entry = Entry {
 942            id: ProjectEntryId::from_proto(1),
 943            kind: EntryKind::UnloadedDir,
 944            path: Arc::from(RelPath::unix(Path::new("src/data")).unwrap()),
 945            inode: 0,
 946            mtime: None,
 947            canonical_path: None,
 948            is_ignored: true,
 949            is_hidden: false,
 950            is_always_included: false,
 951            is_external: false,
 952            is_private: false,
 953            size: 0,
 954            char_bag: Default::default(),
 955            is_fifo: false,
 956        };
 957
 958        // 1. Test searching for `field`, including ignored files without any
 959        // inclusion and exclusion filters.
 960        let include_ignored = true;
 961        let files_to_include = PathMatcher::default();
 962        let files_to_exclude = PathMatcher::default();
 963        let match_full_paths = false;
 964        let search_query = SearchQuery::text(
 965            "field",
 966            false,
 967            false,
 968            include_ignored,
 969            files_to_include,
 970            files_to_exclude,
 971            match_full_paths,
 972            None,
 973        )
 974        .unwrap();
 975
 976        let path_matcher = PathInclusionMatcher::new(Arc::new(search_query));
 977        assert!(path_matcher.should_scan_gitignored_dir(
 978            &entry,
 979            &worktree_snapshot,
 980            &worktree_settings
 981        ));
 982
 983        // 2. Test searching for `field`, including ignored files but updating
 984        // `files_to_include` to only include files under `src/lib`.
 985        let include_ignored = true;
 986        let files_to_include = PathMatcher::new(vec!["src/lib"], PathStyle::Posix).unwrap();
 987        let files_to_exclude = PathMatcher::default();
 988        let match_full_paths = false;
 989        let search_query = SearchQuery::text(
 990            "field",
 991            false,
 992            false,
 993            include_ignored,
 994            files_to_include,
 995            files_to_exclude,
 996            match_full_paths,
 997            None,
 998        )
 999        .unwrap();
1000
1001        let path_matcher = PathInclusionMatcher::new(Arc::new(search_query));
1002        assert!(!path_matcher.should_scan_gitignored_dir(
1003            &entry,
1004            &worktree_snapshot,
1005            &worktree_settings
1006        ));
1007    }
1008}