project_search.rs

   1use std::{
   2    cell::LazyCell,
   3    collections::BTreeSet,
   4    io::{BufRead, BufReader},
   5    ops::Range,
   6    path::{Path, PathBuf},
   7    pin::pin,
   8    sync::Arc,
   9};
  10
  11use anyhow::Context;
  12use collections::HashSet;
  13use fs::Fs;
  14use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
  15use gpui::{App, AppContext, AsyncApp, Entity, Task};
  16use language::{Buffer, BufferSnapshot};
  17use parking_lot::Mutex;
  18use postage::oneshot;
  19use rpc::{AnyProtoClient, proto};
  20use smol::{
  21    channel::{Receiver, Sender, bounded, unbounded},
  22    future::FutureExt,
  23};
  24
  25use text::BufferId;
  26use util::{ResultExt, maybe, paths::compare_rel_paths, rel_path::RelPath};
  27use worktree::{Entry, ProjectEntryId, Snapshot, Worktree, WorktreeSettings};
  28
  29use crate::{
  30    Project, ProjectItem, ProjectPath, RemotelyCreatedModels,
  31    buffer_store::BufferStore,
  32    search::{SearchQuery, SearchResult},
  33    worktree_store::WorktreeStore,
  34};
  35
  36pub struct Search {
  37    buffer_store: Entity<BufferStore>,
  38    worktree_store: Entity<WorktreeStore>,
  39    limit: usize,
  40    kind: SearchKind,
  41}
  42
  43/// Represents search setup, before it is actually kicked off with Search::into_results
  44enum SearchKind {
  45    /// Search for candidates by inspecting file contents on file system, avoiding loading the buffer unless we know that a given file contains a match.
  46    Local {
  47        fs: Arc<dyn Fs>,
  48        worktrees: Vec<Entity<Worktree>>,
  49    },
  50    /// Query remote host for candidates. As of writing, the host runs a local search in "buffers with matches only" mode.
  51    Remote {
  52        client: AnyProtoClient,
  53        remote_id: u64,
  54        models: Arc<Mutex<RemotelyCreatedModels>>,
  55    },
  56    /// Run search against a known set of candidates. Even when working with a remote host, this won't round-trip to host.
  57    OpenBuffersOnly,
  58}
  59
  60/// Represents results of project search and allows one to either obtain match positions OR
  61/// just the handles to buffers that may match the search. Grabbing the handles is cheaper than obtaining full match positions, because in that case we'll look for
  62/// at most one match in each file.
  63#[must_use]
  64pub struct SearchResultsHandle {
  65    results: Receiver<SearchResult>,
  66    matching_buffers: Receiver<Entity<Buffer>>,
  67    trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
  68}
  69
  70impl SearchResultsHandle {
  71    pub fn results(self, cx: &mut App) -> Receiver<SearchResult> {
  72        (self.trigger_search)(cx).detach();
  73        self.results
  74    }
  75    pub fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
  76        (self.trigger_search)(cx).detach();
  77        self.matching_buffers
  78    }
  79}
  80
  81#[derive(Clone)]
  82enum FindSearchCandidates {
  83    Local {
  84        fs: Arc<dyn Fs>,
  85        /// Start off with all paths in project and filter them based on:
  86        /// - Include filters
  87        /// - Exclude filters
  88        /// - Only open buffers
  89        /// - Scan ignored files
  90        /// Put another way: filter out files that can't match (without looking at file contents)
  91        input_paths_rx: Receiver<InputPath>,
  92        /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
  93        /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
  94        confirm_contents_will_match_tx: Sender<MatchingEntry>,
  95        confirm_contents_will_match_rx: Receiver<MatchingEntry>,
  96        /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
  97        /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
  98        get_buffer_for_full_scan_tx: Sender<ProjectPath>,
  99    },
 100    Remote,
 101    OpenBuffersOnly,
 102}
 103
 104impl Search {
 105    pub fn local(
 106        fs: Arc<dyn Fs>,
 107        buffer_store: Entity<BufferStore>,
 108        worktree_store: Entity<WorktreeStore>,
 109        limit: usize,
 110        cx: &mut App,
 111    ) -> Self {
 112        let worktrees = worktree_store.read(cx).visible_worktrees(cx).collect();
 113        Self {
 114            kind: SearchKind::Local { fs, worktrees },
 115            buffer_store,
 116            worktree_store,
 117            limit,
 118        }
 119    }
 120
 121    pub(crate) fn remote(
 122        buffer_store: Entity<BufferStore>,
 123        worktree_store: Entity<WorktreeStore>,
 124        limit: usize,
 125        client_state: (AnyProtoClient, u64, Arc<Mutex<RemotelyCreatedModels>>),
 126    ) -> Self {
 127        Self {
 128            kind: SearchKind::Remote {
 129                client: client_state.0,
 130                remote_id: client_state.1,
 131                models: client_state.2,
 132            },
 133            buffer_store,
 134            worktree_store,
 135            limit,
 136        }
 137    }
 138    pub(crate) fn open_buffers_only(
 139        buffer_store: Entity<BufferStore>,
 140        worktree_store: Entity<WorktreeStore>,
 141        limit: usize,
 142    ) -> Self {
 143        Self {
 144            kind: SearchKind::OpenBuffersOnly,
 145            buffer_store,
 146            worktree_store,
 147            limit,
 148        }
 149    }
 150
 151    pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000;
 152    pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
 153    /// Prepares a project search run. The resulting [`SearchResultsHandle`] has to be used to specify whether you're interested in matching buffers
 154    /// or full search results.
 155    pub fn into_handle(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
 156        let mut open_buffers = HashSet::default();
 157        let mut unnamed_buffers = Vec::new();
 158        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
 159        let buffers = self.buffer_store.read(cx);
 160        for handle in buffers.buffers() {
 161            let buffer = handle.read(cx);
 162            if !buffers.is_searchable(&buffer.remote_id()) {
 163                continue;
 164            } else if let Some(entry_id) = buffer.entry_id(cx) {
 165                open_buffers.insert(entry_id);
 166            } else {
 167                self.limit = self.limit.saturating_sub(1);
 168                unnamed_buffers.push(handle)
 169            };
 170        }
 171        let executor = cx.background_executor().clone();
 172        let (tx, rx) = unbounded();
 173        let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
 174        let matching_buffers = grab_buffer_snapshot_rx.clone();
 175        let trigger_search = Box::new(move |cx: &mut App| {
 176            cx.spawn(async move |cx| {
 177                for buffer in unnamed_buffers {
 178                    _ = grab_buffer_snapshot_tx.send(buffer).await;
 179                }
 180
 181                let (find_all_matches_tx, find_all_matches_rx) =
 182                    bounded(MAX_CONCURRENT_BUFFER_OPENS);
 183                let query = Arc::new(query);
 184                let (candidate_searcher, tasks) = match self.kind {
 185                    SearchKind::OpenBuffersOnly => {
 186                        let Ok(open_buffers) = cx.update(|cx| self.all_loaded_buffers(&query, cx))
 187                        else {
 188                            return;
 189                        };
 190                        let fill_requests = cx
 191                            .background_spawn(async move {
 192                                for buffer in open_buffers {
 193                                    if let Err(_) = grab_buffer_snapshot_tx.send(buffer).await {
 194                                        return;
 195                                    }
 196                                }
 197                            })
 198                            .boxed_local();
 199                        (FindSearchCandidates::OpenBuffersOnly, vec![fill_requests])
 200                    }
 201                    SearchKind::Local {
 202                        fs,
 203                        ref mut worktrees,
 204                    } => {
 205                        let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
 206                            unbounded();
 207                        let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
 208                            bounded(64);
 209                        let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
 210
 211                        let (input_paths_tx, input_paths_rx) = unbounded();
 212                        let tasks = vec![
 213                            cx.spawn(Self::provide_search_paths(
 214                                std::mem::take(worktrees),
 215                                query.clone(),
 216                                input_paths_tx,
 217                                sorted_search_results_tx,
 218                            ))
 219                            .boxed_local(),
 220                            Self::open_buffers(
 221                                &self.buffer_store,
 222                                get_buffer_for_full_scan_rx,
 223                                grab_buffer_snapshot_tx,
 224                                cx.clone(),
 225                            )
 226                            .boxed_local(),
 227                            cx.background_spawn(Self::maintain_sorted_search_results(
 228                                sorted_search_results_rx,
 229                                get_buffer_for_full_scan_tx.clone(),
 230                                self.limit,
 231                            ))
 232                            .boxed_local(),
 233                        ];
 234                        (
 235                            FindSearchCandidates::Local {
 236                                fs,
 237                                get_buffer_for_full_scan_tx,
 238                                confirm_contents_will_match_tx,
 239                                confirm_contents_will_match_rx,
 240                                input_paths_rx,
 241                            },
 242                            tasks,
 243                        )
 244                    }
 245                    SearchKind::Remote {
 246                        client,
 247                        remote_id,
 248                        models,
 249                    } => {
 250                        let request = client.request(proto::FindSearchCandidates {
 251                            project_id: remote_id,
 252                            query: Some(query.to_proto()),
 253                            limit: self.limit as _,
 254                        });
 255                        let Ok(guard) = cx.update(|cx| {
 256                            Project::retain_remotely_created_models_impl(
 257                                &models,
 258                                &self.buffer_store,
 259                                &self.worktree_store,
 260                                cx,
 261                            )
 262                        }) else {
 263                            return;
 264                        };
 265                        let buffer_store = self.buffer_store.downgrade();
 266                        let issue_remote_buffers_request = cx
 267                            .spawn(async move |cx| {
 268                                let _ = maybe!(async move {
 269                                    let response = request.await?;
 270                                    for buffer_id in response.buffer_ids {
 271                                        let buffer_id = BufferId::new(buffer_id)?;
 272                                        let buffer = buffer_store
 273                                            .update(cx, |buffer_store, cx| {
 274                                                buffer_store.wait_for_remote_buffer(buffer_id, cx)
 275                                            })?
 276                                            .await?;
 277                                        let _ = grab_buffer_snapshot_tx.send(buffer).await;
 278                                    }
 279
 280                                    drop(guard);
 281                                    anyhow::Ok(())
 282                                })
 283                                .await
 284                                .log_err();
 285                            })
 286                            .boxed_local();
 287                        (
 288                            FindSearchCandidates::Remote,
 289                            vec![issue_remote_buffers_request],
 290                        )
 291                    }
 292                };
 293
 294                let should_find_all_matches = !tx.is_closed();
 295
 296                let worker_pool = executor.scoped(|scope| {
 297                    let num_cpus = executor.num_cpus();
 298
 299                    assert!(num_cpus > 0);
 300                    for _ in 0..executor.num_cpus() - 1 {
 301                        let worker = Worker {
 302                            query: &query,
 303                            open_buffers: &open_buffers,
 304                            candidates: candidate_searcher.clone(),
 305                            find_all_matches_rx: find_all_matches_rx.clone(),
 306                        };
 307                        scope.spawn(worker.run());
 308                    }
 309
 310                    drop(find_all_matches_rx);
 311                    drop(candidate_searcher);
 312                });
 313
 314                let (sorted_matches_tx, sorted_matches_rx) = unbounded();
 315                // The caller of `into_handle` decides whether they're interested in all matches (files that matched + all matching ranges) or
 316                // just the files. *They are using the same stream as the guts of the project search do*.
 317                // This means that we cannot grab values off of that stream unless it's strictly needed for making a progress in project search.
 318                //
 319                // Grabbing buffer snapshots is only necessary when we're looking for all matches. If the caller decided that they're not interested
 320                // in all matches, running that task unconditionally would hinder caller's ability to observe all matching file paths.
 321                let buffer_snapshots = if should_find_all_matches {
 322                    Some(
 323                        Self::grab_buffer_snapshots(
 324                            grab_buffer_snapshot_rx,
 325                            find_all_matches_tx,
 326                            sorted_matches_tx,
 327                            cx.clone(),
 328                        )
 329                        .boxed_local(),
 330                    )
 331                } else {
 332                    drop(find_all_matches_tx);
 333
 334                    None
 335                };
 336                let ensure_matches_are_reported_in_order = if should_find_all_matches {
 337                    Some(
 338                        Self::ensure_matched_ranges_are_reported_in_order(sorted_matches_rx, tx)
 339                            .boxed_local(),
 340                    )
 341                } else {
 342                    drop(tx);
 343                    None
 344                };
 345
 346                futures::future::join_all(
 347                    [worker_pool.boxed_local()]
 348                        .into_iter()
 349                        .chain(buffer_snapshots)
 350                        .chain(ensure_matches_are_reported_in_order)
 351                        .chain(tasks),
 352                )
 353                .await;
 354            })
 355        });
 356
 357        SearchResultsHandle {
 358            results: rx,
 359            matching_buffers,
 360            trigger_search,
 361        }
 362    }
 363
 364    fn provide_search_paths(
 365        worktrees: Vec<Entity<Worktree>>,
 366        query: Arc<SearchQuery>,
 367        tx: Sender<InputPath>,
 368        results: Sender<oneshot::Receiver<ProjectPath>>,
 369    ) -> impl AsyncFnOnce(&mut AsyncApp) {
 370        async move |cx| {
 371            _ = maybe!(async move {
 372                let gitignored_tracker = PathInclusionMatcher::new(query.clone());
 373                for worktree in worktrees {
 374                    let (mut snapshot, worktree_settings) = worktree
 375                        .read_with(cx, |this, _| {
 376                            Some((this.snapshot(), this.as_local()?.settings()))
 377                        })?
 378                        .context("The worktree is not local")?;
 379                    if query.include_ignored() {
 380                        // Pre-fetch all of the ignored directories as they're going to be searched.
 381                        let mut entries_to_refresh = vec![];
 382
 383                        for entry in snapshot.entries(query.include_ignored(), 0) {
 384                            if gitignored_tracker.should_scan_gitignored_dir(
 385                                entry,
 386                                &snapshot,
 387                                &worktree_settings,
 388                            ) {
 389                                entries_to_refresh.push(entry.path.clone());
 390                            }
 391                        }
 392                        let barrier = worktree.update(cx, |this, _| {
 393                            let local = this.as_local_mut()?;
 394                            let barrier = entries_to_refresh
 395                                .into_iter()
 396                                .map(|path| local.add_path_prefix_to_scan(path).into_future())
 397                                .collect::<Vec<_>>();
 398                            Some(barrier)
 399                        })?;
 400                        if let Some(barriers) = barrier {
 401                            futures::future::join_all(barriers).await;
 402                        }
 403                        snapshot = worktree.read_with(cx, |this, _| this.snapshot())?;
 404                    }
 405                    cx.background_executor()
 406                        .scoped(|scope| {
 407                            scope.spawn(async {
 408                                for entry in snapshot.files(query.include_ignored(), 0) {
 409                                    let (should_scan_tx, should_scan_rx) = oneshot::channel();
 410
 411                                    let Ok(_) = tx
 412                                        .send(InputPath {
 413                                            entry: entry.clone(),
 414                                            snapshot: snapshot.clone(),
 415                                            should_scan_tx,
 416                                        })
 417                                        .await
 418                                    else {
 419                                        return;
 420                                    };
 421                                    if results.send(should_scan_rx).await.is_err() {
 422                                        return;
 423                                    };
 424                                }
 425                            })
 426                        })
 427                        .await;
 428                }
 429                anyhow::Ok(())
 430            })
 431            .await;
 432        }
 433    }
 434
 435    async fn maintain_sorted_search_results(
 436        rx: Receiver<oneshot::Receiver<ProjectPath>>,
 437        paths_for_full_scan: Sender<ProjectPath>,
 438        limit: usize,
 439    ) {
 440        let mut rx = pin!(rx);
 441        let mut matched = 0;
 442        while let Some(mut next_path_result) = rx.next().await {
 443            let Some(successful_path) = next_path_result.next().await else {
 444                // This file did not produce a match, hence skip it.
 445                continue;
 446            };
 447            if paths_for_full_scan.send(successful_path).await.is_err() {
 448                return;
 449            };
 450            matched += 1;
 451            if matched >= limit {
 452                break;
 453            }
 454        }
 455    }
 456
 457    /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
 458    async fn open_buffers(
 459        buffer_store: &Entity<BufferStore>,
 460        rx: Receiver<ProjectPath>,
 461        find_all_matches_tx: Sender<Entity<Buffer>>,
 462        mut cx: AsyncApp,
 463    ) {
 464        let mut rx = pin!(rx.ready_chunks(64));
 465        _ = maybe!(async move {
 466            while let Some(requested_paths) = rx.next().await {
 467                let mut buffers = buffer_store.update(&mut cx, |this, cx| {
 468                    requested_paths
 469                        .into_iter()
 470                        .map(|path| this.open_buffer(path, cx))
 471                        .collect::<FuturesOrdered<_>>()
 472                })?;
 473
 474                while let Some(buffer) = buffers.next().await {
 475                    if let Some(buffer) = buffer.log_err() {
 476                        find_all_matches_tx.send(buffer).await?;
 477                    }
 478                }
 479            }
 480            Result::<_, anyhow::Error>::Ok(())
 481        })
 482        .await;
 483    }
 484
 485    async fn grab_buffer_snapshots(
 486        rx: Receiver<Entity<Buffer>>,
 487        find_all_matches_tx: Sender<(
 488            Entity<Buffer>,
 489            BufferSnapshot,
 490            oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
 491        )>,
 492        results: Sender<oneshot::Receiver<(Entity<Buffer>, Vec<Range<language::Anchor>>)>>,
 493        mut cx: AsyncApp,
 494    ) {
 495        _ = maybe!(async move {
 496            while let Ok(buffer) = rx.recv().await {
 497                let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
 498                let (tx, rx) = oneshot::channel();
 499                find_all_matches_tx.send((buffer, snapshot, tx)).await?;
 500                results.send(rx).await?;
 501            }
 502            debug_assert!(rx.is_empty());
 503            Result::<_, anyhow::Error>::Ok(())
 504        })
 505        .await;
 506    }
 507
 508    async fn ensure_matched_ranges_are_reported_in_order(
 509        rx: Receiver<oneshot::Receiver<(Entity<Buffer>, Vec<Range<language::Anchor>>)>>,
 510        tx: Sender<SearchResult>,
 511    ) {
 512        use postage::stream::Stream;
 513        _ = maybe!(async move {
 514            let mut matched_buffers = 0;
 515            let mut matches = 0;
 516            while let Ok(mut next_buffer_matches) = rx.recv().await {
 517                let Some((buffer, ranges)) = next_buffer_matches.recv().await else {
 518                    continue;
 519                };
 520
 521                if matched_buffers > Search::MAX_SEARCH_RESULT_FILES
 522                    || matches > Search::MAX_SEARCH_RESULT_RANGES
 523                {
 524                    _ = tx.send(SearchResult::LimitReached).await;
 525                    break;
 526                }
 527                matched_buffers += 1;
 528                matches += ranges.len();
 529
 530                _ = tx.send(SearchResult::Buffer { buffer, ranges }).await?;
 531            }
 532            anyhow::Ok(())
 533        })
 534        .await;
 535    }
 536
 537    fn all_loaded_buffers(&self, search_query: &SearchQuery, cx: &App) -> Vec<Entity<Buffer>> {
 538        let worktree_store = self.worktree_store.read(cx);
 539        let mut buffers = search_query
 540            .buffers()
 541            .into_iter()
 542            .flatten()
 543            .filter(|buffer| {
 544                let b = buffer.read(cx);
 545                if let Some(file) = b.file() {
 546                    if !search_query.match_path(file.path()) {
 547                        return false;
 548                    }
 549                    if !search_query.include_ignored()
 550                        && let Some(entry) = b
 551                            .entry_id(cx)
 552                            .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx))
 553                        && entry.is_ignored
 554                    {
 555                        return false;
 556                    }
 557                }
 558                true
 559            })
 560            .cloned()
 561            .collect::<Vec<_>>();
 562        buffers.sort_by(|a, b| {
 563            let a = a.read(cx);
 564            let b = b.read(cx);
 565            match (a.file(), b.file()) {
 566                (None, None) => a.remote_id().cmp(&b.remote_id()),
 567                (None, Some(_)) => std::cmp::Ordering::Less,
 568                (Some(_), None) => std::cmp::Ordering::Greater,
 569                (Some(a), Some(b)) => compare_rel_paths((a.path(), true), (b.path(), true)),
 570            }
 571        });
 572
 573        buffers
 574    }
 575}
 576
 577struct Worker<'search> {
 578    query: &'search SearchQuery,
 579    open_buffers: &'search HashSet<ProjectEntryId>,
 580    candidates: FindSearchCandidates,
 581    /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
 582    /// Then, when you're done, share them via the channel you were given.
 583    find_all_matches_rx: Receiver<(
 584        Entity<Buffer>,
 585        BufferSnapshot,
 586        oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
 587    )>,
 588}
 589
 590impl Worker<'_> {
 591    async fn run(self) {
 592        let (
 593            input_paths_rx,
 594            confirm_contents_will_match_rx,
 595            mut confirm_contents_will_match_tx,
 596            mut get_buffer_for_full_scan_tx,
 597            fs,
 598        ) = match self.candidates {
 599            FindSearchCandidates::Local {
 600                fs,
 601                input_paths_rx,
 602                confirm_contents_will_match_rx,
 603                confirm_contents_will_match_tx,
 604                get_buffer_for_full_scan_tx,
 605            } => (
 606                input_paths_rx,
 607                confirm_contents_will_match_rx,
 608                confirm_contents_will_match_tx,
 609                get_buffer_for_full_scan_tx,
 610                Some(fs),
 611            ),
 612            FindSearchCandidates::Remote | FindSearchCandidates::OpenBuffersOnly => (
 613                unbounded().1,
 614                unbounded().1,
 615                unbounded().0,
 616                unbounded().0,
 617                None,
 618            ),
 619        };
 620        // WorkerA: grabs a request for "find all matches in file/a" <- takes 5 minutes
 621        // right after: WorkerB: grabs a request for "find all matches in file/b" <- takes 5 seconds
 622        let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
 623        let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse());
 624        let mut scan_path = pin!(input_paths_rx.fuse());
 625
 626        loop {
 627            let handler = RequestHandler {
 628                query: self.query,
 629                open_entries: &self.open_buffers,
 630                fs: fs.as_deref(),
 631                confirm_contents_will_match_tx: &confirm_contents_will_match_tx,
 632                get_buffer_for_full_scan_tx: &get_buffer_for_full_scan_tx,
 633            };
 634            // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
 635            // steps straight away. Another worker might be about to produce a value that will
 636            // be pushed there, thus we'll replace current worker's pipe with a dummy one.
 637            // That way, we'll only ever close a next-stage channel when ALL workers do so.
 638            select_biased! {
 639                find_all_matches = find_all_matches.next() => {
 640                    let Some(matches) = find_all_matches else {
 641                        continue;
 642                    };
 643                    handler.handle_find_all_matches(matches).await;
 644                },
 645                find_first_match = find_first_match.next() => {
 646                    if let Some(buffer_with_at_least_one_match) = find_first_match {
 647                        handler.handle_find_first_match(buffer_with_at_least_one_match).await;
 648                    } else {
 649                        get_buffer_for_full_scan_tx = bounded(1).0;
 650                    }
 651
 652                },
 653                scan_path = scan_path.next() => {
 654                    if let Some(path_to_scan) = scan_path {
 655                        handler.handle_scan_path(path_to_scan).await;
 656                    } else {
 657                        // If we're the last worker to notice that this is not producing values, close the upstream.
 658                        confirm_contents_will_match_tx = bounded(1).0;
 659                    }
 660
 661                 }
 662                 complete => {
 663                     break
 664                },
 665
 666            }
 667        }
 668    }
 669}
 670
 671struct RequestHandler<'worker> {
 672    query: &'worker SearchQuery,
 673    fs: Option<&'worker dyn Fs>,
 674    open_entries: &'worker HashSet<ProjectEntryId>,
 675    confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
 676    get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
 677}
 678
 679impl RequestHandler<'_> {
 680    async fn handle_find_all_matches(
 681        &self,
 682        (buffer, snapshot, mut report_matches): (
 683            Entity<Buffer>,
 684            BufferSnapshot,
 685            oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
 686        ),
 687    ) {
 688        let ranges = self
 689            .query
 690            .search(&snapshot, None)
 691            .await
 692            .iter()
 693            .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
 694            .collect::<Vec<_>>();
 695
 696        _ = report_matches.send((buffer, ranges)).await;
 697    }
 698
 699    async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
 700        _=maybe!(async move {
 701            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
 702            let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else {
 703                return anyhow::Ok(());
 704            };
 705
 706            let mut file = BufReader::new(file);
 707            let file_start = file.fill_buf()?;
 708
 709            if let Err(Some(starting_position)) =
 710            std::str::from_utf8(file_start).map_err(|e| e.error_len())
 711            {
 712                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
 713                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
 714                log::debug!(
 715                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
 716                );
 717                return Ok(());
 718            }
 719
 720            if self.query.detect(file).unwrap_or(false) {
 721                // Yes, we should scan the whole file.
 722                entry.should_scan_tx.send(entry.path).await?;
 723            }
 724            Ok(())
 725        }).await;
 726    }
 727
 728    async fn handle_scan_path(&self, req: InputPath) {
 729        _ = maybe!(async move {
 730            let InputPath {
 731                entry,
 732
 733                snapshot,
 734                should_scan_tx,
 735            } = req;
 736
 737            if entry.is_fifo || !entry.is_file() {
 738                return Ok(());
 739            }
 740
 741            if self.query.filters_path() {
 742                let matched_path = if self.query.match_full_paths() {
 743                    let mut full_path = snapshot.root_name().to_owned();
 744                    full_path.push(&entry.path);
 745                    self.query.match_path(&full_path)
 746                } else {
 747                    self.query.match_path(&entry.path)
 748                };
 749                if !matched_path {
 750                    return Ok(());
 751                }
 752            }
 753
 754            if self.open_entries.contains(&entry.id) {
 755                // The buffer is already in memory and that's the version we want to scan;
 756                // hence skip the dilly-dally and look for all matches straight away.
 757                self.get_buffer_for_full_scan_tx
 758                    .send(ProjectPath {
 759                        worktree_id: snapshot.id(),
 760                        path: entry.path.clone(),
 761                    })
 762                    .await?;
 763            } else {
 764                self.confirm_contents_will_match_tx
 765                    .send(MatchingEntry {
 766                        should_scan_tx: should_scan_tx,
 767                        worktree_root: snapshot.abs_path().clone(),
 768                        path: ProjectPath {
 769                            worktree_id: snapshot.id(),
 770                            path: entry.path.clone(),
 771                        },
 772                    })
 773                    .await?;
 774            }
 775
 776            anyhow::Ok(())
 777        })
 778        .await;
 779    }
 780}
 781
 782struct InputPath {
 783    entry: Entry,
 784    snapshot: Snapshot,
 785    should_scan_tx: oneshot::Sender<ProjectPath>,
 786}
 787
 788struct MatchingEntry {
 789    worktree_root: Arc<Path>,
 790    path: ProjectPath,
 791    should_scan_tx: oneshot::Sender<ProjectPath>,
 792}
 793
 794/// This struct encapsulates the logic to decide whether a given gitignored directory should be
 795/// scanned based on include/exclude patterns of a search query (as include/exclude parameters may match paths inside it).
 796/// It is kind-of doing an inverse of glob. Given a glob pattern like `src/**/` and a parent path like `src`, we need to decide whether the parent
 797/// may contain glob hits.
 798struct PathInclusionMatcher {
 799    included: BTreeSet<PathBuf>,
 800    query: Arc<SearchQuery>,
 801}
 802
 803impl PathInclusionMatcher {
 804    fn new(query: Arc<SearchQuery>) -> Self {
 805        let mut included = BTreeSet::new();
 806        // To do an inverse glob match, we split each glob into it's prefix and the glob part.
 807        // For example, `src/**/*.rs` becomes `src/` and `**/*.rs`. The glob part gets dropped.
 808        // Then, when checking whether a given directory should be scanned, we check whether it is a non-empty substring of any glob prefix.
 809        if query.filters_path() {
 810            included.extend(
 811                query
 812                    .files_to_include()
 813                    .sources()
 814                    .flat_map(|glob| Some(wax::Glob::new(glob).ok()?.partition().0)),
 815            );
 816        }
 817        Self { included, query }
 818    }
 819
 820    fn should_scan_gitignored_dir(
 821        &self,
 822        entry: &Entry,
 823        snapshot: &Snapshot,
 824        worktree_settings: &WorktreeSettings,
 825    ) -> bool {
 826        if !entry.is_ignored || !entry.kind.is_unloaded() {
 827            return false;
 828        }
 829        if !self.query.include_ignored() {
 830            return false;
 831        }
 832        if worktree_settings.is_path_excluded(&entry.path) {
 833            return false;
 834        }
 835        if !self.query.filters_path() {
 836            return true;
 837        }
 838
 839        let as_abs_path = LazyCell::new(move || snapshot.absolutize(&entry.path));
 840        let entry_path = &entry.path;
 841        // 3. Check Exclusions (Pruning)
 842        // If the current path is a child of an excluded path, we stop.
 843        let is_excluded = self.path_is_definitely_excluded(&entry_path, snapshot);
 844
 845        if is_excluded {
 846            return false;
 847        }
 848
 849        // 4. Check Inclusions (Traversal)
 850        if self.included.is_empty() {
 851            return true;
 852        }
 853
 854        // We scan if the current path is a descendant of an include prefix
 855        // OR if the current path is an ancestor of an include prefix (we need to go deeper to find it).
 856        let is_included = self.included.iter().any(|prefix| {
 857            let (prefix_matches_entry, entry_matches_prefix) = if prefix.is_absolute() {
 858                (
 859                    prefix.starts_with(&**as_abs_path),
 860                    as_abs_path.starts_with(prefix),
 861                )
 862            } else {
 863                RelPath::new(prefix, snapshot.path_style()).map_or((false, false), |prefix| {
 864                    (
 865                        prefix.starts_with(entry_path),
 866                        entry_path.starts_with(&prefix),
 867                    )
 868                })
 869            };
 870
 871            // Logic:
 872            // 1. entry_matches_prefix: We are inside the target zone (e.g. glob: src/, current: src/lib/). Keep scanning.
 873            // 2. prefix_matches_entry: We are above the target zone (e.g. glob: src/foo/, current: src/). Keep scanning to reach foo.
 874            prefix_matches_entry || entry_matches_prefix
 875        });
 876
 877        is_included
 878    }
 879    fn path_is_definitely_excluded(&self, path: &RelPath, snapshot: &Snapshot) -> bool {
 880        if !self.query.files_to_exclude().sources().next().is_none() {
 881            let mut path = if self.query.match_full_paths() {
 882                let mut full_path = snapshot.root_name().to_owned();
 883                full_path.push(path);
 884                full_path
 885            } else {
 886                path.to_owned()
 887            };
 888            loop {
 889                if self.query.files_to_exclude().is_match(&path) {
 890                    return true;
 891                } else if !path.pop() {
 892                    return false;
 893                }
 894            }
 895        } else {
 896            false
 897        }
 898    }
 899}
 900
 901#[cfg(test)]
 902mod tests {
 903    use super::*;
 904    use fs::FakeFs;
 905    use serde_json::json;
 906    use settings::Settings;
 907    use util::{
 908        path,
 909        paths::{PathMatcher, PathStyle},
 910        rel_path::RelPath,
 911    };
 912    use worktree::{Entry, EntryKind, WorktreeSettings};
 913
 914    use crate::{
 915        Project, project_search::PathInclusionMatcher, project_tests::init_test,
 916        search::SearchQuery,
 917    };
 918
 919    #[gpui::test]
 920    async fn test_path_inclusion_matcher(cx: &mut gpui::TestAppContext) {
 921        init_test(cx);
 922
 923        let fs = FakeFs::new(cx.background_executor.clone());
 924        fs.insert_tree(
 925            "/root",
 926            json!({
 927                ".gitignore": "src/data/\n",
 928                "src": {
 929                    "data": {
 930                        "main.csv": "field_1,field_2,field_3",
 931                    },
 932                    "lib": {
 933                        "main.txt": "Are you familiar with fields?",
 934                    },
 935                },
 936            }),
 937        )
 938        .await;
 939
 940        let project = Project::test(fs.clone(), [path!("/root").as_ref()], cx).await;
 941        let worktree = project.update(cx, |project, cx| project.worktrees(cx).next().unwrap());
 942        let (worktree_settings, worktree_snapshot) = worktree.update(cx, |worktree, cx| {
 943            let settings_location = worktree.settings_location(cx);
 944            return (
 945                WorktreeSettings::get(Some(settings_location), cx).clone(),
 946                worktree.snapshot(),
 947            );
 948        });
 949
 950        // Manually create a test entry for the gitignored directory since it won't
 951        // be loaded by the worktree
 952        let entry = Entry {
 953            id: ProjectEntryId::from_proto(1),
 954            kind: EntryKind::UnloadedDir,
 955            path: Arc::from(RelPath::unix(Path::new("src/data")).unwrap()),
 956            inode: 0,
 957            mtime: None,
 958            canonical_path: None,
 959            is_ignored: true,
 960            is_hidden: false,
 961            is_always_included: false,
 962            is_external: false,
 963            is_private: false,
 964            size: 0,
 965            char_bag: Default::default(),
 966            is_fifo: false,
 967        };
 968
 969        // 1. Test searching for `field`, including ignored files without any
 970        // inclusion and exclusion filters.
 971        let include_ignored = true;
 972        let files_to_include = PathMatcher::default();
 973        let files_to_exclude = PathMatcher::default();
 974        let match_full_paths = false;
 975        let search_query = SearchQuery::text(
 976            "field",
 977            false,
 978            false,
 979            include_ignored,
 980            files_to_include,
 981            files_to_exclude,
 982            match_full_paths,
 983            None,
 984        )
 985        .unwrap();
 986
 987        let path_matcher = PathInclusionMatcher::new(Arc::new(search_query));
 988        assert!(path_matcher.should_scan_gitignored_dir(
 989            &entry,
 990            &worktree_snapshot,
 991            &worktree_settings
 992        ));
 993
 994        // 2. Test searching for `field`, including ignored files but updating
 995        // `files_to_include` to only include files under `src/lib`.
 996        let include_ignored = true;
 997        let files_to_include = PathMatcher::new(vec!["src/lib"], PathStyle::Posix).unwrap();
 998        let files_to_exclude = PathMatcher::default();
 999        let match_full_paths = false;
1000        let search_query = SearchQuery::text(
1001            "field",
1002            false,
1003            false,
1004            include_ignored,
1005            files_to_include,
1006            files_to_exclude,
1007            match_full_paths,
1008            None,
1009        )
1010        .unwrap();
1011
1012        let path_matcher = PathInclusionMatcher::new(Arc::new(search_query));
1013        assert!(!path_matcher.should_scan_gitignored_dir(
1014            &entry,
1015            &worktree_snapshot,
1016            &worktree_settings
1017        ));
1018    }
1019}