project_search.rs

   1use std::{
   2    cell::LazyCell,
   3    collections::BTreeSet,
   4    io::{BufRead, BufReader},
   5    ops::Range,
   6    path::{Path, PathBuf},
   7    pin::pin,
   8    sync::Arc,
   9};
  10
  11use anyhow::Context;
  12use collections::HashSet;
  13use fs::Fs;
  14use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
  15use gpui::{App, AppContext, AsyncApp, Entity, Task};
  16use language::{Buffer, BufferSnapshot};
  17use parking_lot::Mutex;
  18use postage::oneshot;
  19use rpc::{AnyProtoClient, proto};
  20use smol::{
  21    channel::{Receiver, Sender, bounded, unbounded},
  22    future::FutureExt,
  23};
  24
  25use text::BufferId;
  26use util::{ResultExt, maybe, paths::compare_rel_paths, rel_path::RelPath};
  27use worktree::{Entry, ProjectEntryId, Snapshot, Worktree, WorktreeSettings};
  28
  29use crate::{
  30    Project, ProjectItem, ProjectPath, RemotelyCreatedModels,
  31    buffer_store::BufferStore,
  32    search::{SearchQuery, SearchResult},
  33    worktree_store::WorktreeStore,
  34};
  35
  36pub struct Search {
  37    buffer_store: Entity<BufferStore>,
  38    worktree_store: Entity<WorktreeStore>,
  39    limit: usize,
  40    kind: SearchKind,
  41}
  42
  43/// Represents search setup, before it is actually kicked off with Search::into_results
  44enum SearchKind {
  45    /// Search for candidates by inspecting file contents on file system, avoiding loading the buffer unless we know that a given file contains a match.
  46    Local {
  47        fs: Arc<dyn Fs>,
  48        worktrees: Vec<Entity<Worktree>>,
  49    },
  50    /// Query remote host for candidates. As of writing, the host runs a local search in "buffers with matches only" mode.
  51    Remote {
  52        client: AnyProtoClient,
  53        remote_id: u64,
  54        models: Arc<Mutex<RemotelyCreatedModels>>,
  55    },
  56    /// Run search against a known set of candidates. Even when working with a remote host, this won't round-trip to host.
  57    OpenBuffersOnly,
  58}
  59
  60/// Represents results of project search and allows one to either obtain match positions OR
  61/// just the handles to buffers that may match the search. Grabbing the handles is cheaper than obtaining full match positions, because in that case we'll look for
  62/// at most one match in each file.
  63#[must_use]
  64pub struct SearchResultsHandle {
  65    results: Receiver<SearchResult>,
  66    matching_buffers: Receiver<Entity<Buffer>>,
  67    trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
  68}
  69
  70pub struct SearchResults<T> {
  71    pub _task_handle: Task<()>,
  72    pub rx: Receiver<T>,
  73}
  74impl SearchResultsHandle {
  75    pub fn results(self, cx: &mut App) -> SearchResults<SearchResult> {
  76        SearchResults {
  77            _task_handle: (self.trigger_search)(cx),
  78            rx: self.results,
  79        }
  80    }
  81    pub fn matching_buffers(self, cx: &mut App) -> SearchResults<Entity<Buffer>> {
  82        SearchResults {
  83            _task_handle: (self.trigger_search)(cx),
  84            rx: self.matching_buffers,
  85        }
  86    }
  87}
  88
  89#[derive(Clone)]
  90enum FindSearchCandidates {
  91    Local {
  92        fs: Arc<dyn Fs>,
  93        /// Start off with all paths in project and filter them based on:
  94        /// - Include filters
  95        /// - Exclude filters
  96        /// - Only open buffers
  97        /// - Scan ignored files
  98        /// Put another way: filter out files that can't match (without looking at file contents)
  99        input_paths_rx: Receiver<InputPath>,
 100        /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
 101        /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
 102        confirm_contents_will_match_tx: Sender<MatchingEntry>,
 103        confirm_contents_will_match_rx: Receiver<MatchingEntry>,
 104    },
 105    Remote,
 106    OpenBuffersOnly,
 107}
 108
 109impl Search {
 110    pub fn local(
 111        fs: Arc<dyn Fs>,
 112        buffer_store: Entity<BufferStore>,
 113        worktree_store: Entity<WorktreeStore>,
 114        limit: usize,
 115        cx: &mut App,
 116    ) -> Self {
 117        let worktrees = worktree_store.read(cx).visible_worktrees(cx).collect();
 118        Self {
 119            kind: SearchKind::Local { fs, worktrees },
 120            buffer_store,
 121            worktree_store,
 122            limit,
 123        }
 124    }
 125
 126    pub(crate) fn remote(
 127        buffer_store: Entity<BufferStore>,
 128        worktree_store: Entity<WorktreeStore>,
 129        limit: usize,
 130        client_state: (AnyProtoClient, u64, Arc<Mutex<RemotelyCreatedModels>>),
 131    ) -> Self {
 132        Self {
 133            kind: SearchKind::Remote {
 134                client: client_state.0,
 135                remote_id: client_state.1,
 136                models: client_state.2,
 137            },
 138            buffer_store,
 139            worktree_store,
 140            limit,
 141        }
 142    }
 143    pub(crate) fn open_buffers_only(
 144        buffer_store: Entity<BufferStore>,
 145        worktree_store: Entity<WorktreeStore>,
 146        limit: usize,
 147    ) -> Self {
 148        Self {
 149            kind: SearchKind::OpenBuffersOnly,
 150            buffer_store,
 151            worktree_store,
 152            limit,
 153        }
 154    }
 155
 156    pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000;
 157    pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
 158    /// Prepares a project search run. The resulting [`SearchResultsHandle`] has to be used to specify whether you're interested in matching buffers
 159    /// or full search results.
 160    pub fn into_handle(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
 161        let mut open_buffers = HashSet::default();
 162        let mut unnamed_buffers = Vec::new();
 163        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
 164        let buffers = self.buffer_store.read(cx);
 165        for handle in buffers.buffers() {
 166            let buffer = handle.read(cx);
 167            if !buffers.is_searchable(&buffer.remote_id()) {
 168                continue;
 169            } else if let Some(entry_id) = buffer.entry_id(cx) {
 170                open_buffers.insert(entry_id);
 171            } else {
 172                self.limit = self.limit.saturating_sub(1);
 173                unnamed_buffers.push(handle)
 174            };
 175        }
 176        let open_buffers = Arc::new(open_buffers);
 177        let executor = cx.background_executor().clone();
 178        let (tx, rx) = unbounded();
 179        let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
 180        let matching_buffers = grab_buffer_snapshot_rx.clone();
 181        let trigger_search = Box::new(move |cx: &mut App| {
 182            cx.spawn(async move |cx| {
 183                for buffer in unnamed_buffers {
 184                    _ = grab_buffer_snapshot_tx.send(buffer).await;
 185                }
 186
 187                let (find_all_matches_tx, find_all_matches_rx) =
 188                    bounded(MAX_CONCURRENT_BUFFER_OPENS);
 189                let query = Arc::new(query);
 190                let (candidate_searcher, tasks) = match self.kind {
 191                    SearchKind::OpenBuffersOnly => {
 192                        let Ok(open_buffers) = cx.update(|cx| self.all_loaded_buffers(&query, cx))
 193                        else {
 194                            return;
 195                        };
 196                        let fill_requests = cx
 197                            .background_spawn(async move {
 198                                for buffer in open_buffers {
 199                                    if let Err(_) = grab_buffer_snapshot_tx.send(buffer).await {
 200                                        return;
 201                                    }
 202                                }
 203                            })
 204                            .boxed_local();
 205                        (FindSearchCandidates::OpenBuffersOnly, vec![fill_requests])
 206                    }
 207                    SearchKind::Local {
 208                        fs,
 209                        ref mut worktrees,
 210                    } => {
 211                        let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
 212                            unbounded();
 213                        let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
 214                            bounded(64);
 215                        let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
 216
 217                        let (input_paths_tx, input_paths_rx) = unbounded();
 218                        let tasks = vec![
 219                            cx.spawn(Self::provide_search_paths(
 220                                std::mem::take(worktrees),
 221                                query.clone(),
 222                                input_paths_tx,
 223                                sorted_search_results_tx,
 224                            ))
 225                            .boxed_local(),
 226                            Self::open_buffers(
 227                                self.buffer_store,
 228                                get_buffer_for_full_scan_rx,
 229                                grab_buffer_snapshot_tx,
 230                                cx.clone(),
 231                            )
 232                            .boxed_local(),
 233                            cx.background_spawn(Self::maintain_sorted_search_results(
 234                                sorted_search_results_rx,
 235                                get_buffer_for_full_scan_tx,
 236                                self.limit,
 237                            ))
 238                            .boxed_local(),
 239                        ];
 240                        (
 241                            FindSearchCandidates::Local {
 242                                fs,
 243                                confirm_contents_will_match_tx,
 244                                confirm_contents_will_match_rx,
 245                                input_paths_rx,
 246                            },
 247                            tasks,
 248                        )
 249                    }
 250                    SearchKind::Remote {
 251                        client,
 252                        remote_id,
 253                        models,
 254                    } => {
 255                        let request = client.request(proto::FindSearchCandidates {
 256                            project_id: remote_id,
 257                            query: Some(query.to_proto()),
 258                            limit: self.limit as _,
 259                        });
 260                        let weak_buffer_store = self.buffer_store.downgrade();
 261                        let buffer_store = self.buffer_store;
 262                        let Ok(guard) = cx.update(|cx| {
 263                            Project::retain_remotely_created_models_impl(
 264                                &models,
 265                                &buffer_store,
 266                                &self.worktree_store,
 267                                cx,
 268                            )
 269                        }) else {
 270                            return;
 271                        };
 272
 273                        let issue_remote_buffers_request = cx
 274                            .spawn(async move |cx| {
 275                                let _ = maybe!(async move {
 276                                    let response = request.await?;
 277                                    for buffer_id in response.buffer_ids {
 278                                        let buffer_id = BufferId::new(buffer_id)?;
 279                                        let buffer = weak_buffer_store
 280                                            .update(cx, |buffer_store, cx| {
 281                                                buffer_store.wait_for_remote_buffer(buffer_id, cx)
 282                                            })?
 283                                            .await?;
 284                                        let _ = grab_buffer_snapshot_tx.send(buffer).await;
 285                                    }
 286
 287                                    drop(guard);
 288                                    anyhow::Ok(())
 289                                })
 290                                .await
 291                                .log_err();
 292                            })
 293                            .boxed_local();
 294                        (
 295                            FindSearchCandidates::Remote,
 296                            vec![issue_remote_buffers_request],
 297                        )
 298                    }
 299                };
 300
 301                let should_find_all_matches = !tx.is_closed();
 302
 303                let _executor = executor.clone();
 304                let worker_pool = executor.spawn(async move {
 305                    let num_cpus = _executor.num_cpus();
 306
 307                    assert!(num_cpus > 0);
 308                    _executor
 309                        .scoped(|scope| {
 310                            for _ in 0..num_cpus - 1 {
 311                                let worker = Worker {
 312                                    query: query.clone(),
 313                                    open_buffers: open_buffers.clone(),
 314                                    candidates: candidate_searcher.clone(),
 315                                    find_all_matches_rx: find_all_matches_rx.clone(),
 316                                };
 317                                scope.spawn(worker.run());
 318                            }
 319
 320                            drop(find_all_matches_rx);
 321                            drop(candidate_searcher);
 322                        })
 323                        .await;
 324                });
 325
 326                let (sorted_matches_tx, sorted_matches_rx) = unbounded();
 327                // The caller of `into_handle` decides whether they're interested in all matches (files that matched + all matching ranges) or
 328                // just the files. *They are using the same stream as the guts of the project search do*.
 329                // This means that we cannot grab values off of that stream unless it's strictly needed for making a progress in project search.
 330                //
 331                // Grabbing buffer snapshots is only necessary when we're looking for all matches. If the caller decided that they're not interested
 332                // in all matches, running that task unconditionally would hinder caller's ability to observe all matching file paths.
 333                let buffer_snapshots = if should_find_all_matches {
 334                    Some(
 335                        Self::grab_buffer_snapshots(
 336                            grab_buffer_snapshot_rx,
 337                            find_all_matches_tx,
 338                            sorted_matches_tx,
 339                            cx.clone(),
 340                        )
 341                        .boxed_local(),
 342                    )
 343                } else {
 344                    drop(find_all_matches_tx);
 345
 346                    None
 347                };
 348                let ensure_matches_are_reported_in_order = if should_find_all_matches {
 349                    Some(
 350                        Self::ensure_matched_ranges_are_reported_in_order(sorted_matches_rx, tx)
 351                            .boxed_local(),
 352                    )
 353                } else {
 354                    drop(tx);
 355                    None
 356                };
 357
 358                futures::future::join_all(
 359                    [worker_pool.boxed_local()]
 360                        .into_iter()
 361                        .chain(buffer_snapshots)
 362                        .chain(ensure_matches_are_reported_in_order)
 363                        .chain(tasks),
 364                )
 365                .await;
 366            })
 367        });
 368
 369        SearchResultsHandle {
 370            results: rx,
 371            matching_buffers,
 372            trigger_search,
 373        }
 374    }
 375
 376    fn provide_search_paths(
 377        worktrees: Vec<Entity<Worktree>>,
 378        query: Arc<SearchQuery>,
 379        tx: Sender<InputPath>,
 380        results: Sender<oneshot::Receiver<ProjectPath>>,
 381    ) -> impl AsyncFnOnce(&mut AsyncApp) {
 382        async move |cx| {
 383            _ = maybe!(async move {
 384                let gitignored_tracker = PathInclusionMatcher::new(query.clone());
 385                let include_ignored = query.include_ignored();
 386                for worktree in worktrees {
 387                    let (mut snapshot, worktree_settings) = worktree
 388                        .read_with(cx, |this, _| {
 389                            Some((this.snapshot(), this.as_local()?.settings()))
 390                        })?
 391                        .context("The worktree is not local")?;
 392                    if query.include_ignored() {
 393                        // Pre-fetch all of the ignored directories as they're going to be searched.
 394                        let mut entries_to_refresh = vec![];
 395
 396                        for entry in snapshot.entries(query.include_ignored(), 0) {
 397                            if gitignored_tracker.should_scan_gitignored_dir(
 398                                entry,
 399                                &snapshot,
 400                                &worktree_settings,
 401                            ) {
 402                                entries_to_refresh.push(entry.path.clone());
 403                            }
 404                        }
 405                        let barrier = worktree.update(cx, |this, _| {
 406                            let local = this.as_local_mut()?;
 407                            let barrier = entries_to_refresh
 408                                .into_iter()
 409                                .map(|path| local.add_path_prefix_to_scan(path).into_future())
 410                                .collect::<Vec<_>>();
 411                            Some(barrier)
 412                        })?;
 413                        if let Some(barriers) = barrier {
 414                            futures::future::join_all(barriers).await;
 415                        }
 416                        snapshot = worktree.read_with(cx, |this, _| this.snapshot())?;
 417                    }
 418                    let tx = tx.clone();
 419                    let results = results.clone();
 420
 421                    cx.background_executor()
 422                        .spawn(async move {
 423                            for entry in snapshot.files(include_ignored, 0) {
 424                                let (should_scan_tx, should_scan_rx) = oneshot::channel();
 425
 426                                let Ok(_) = tx
 427                                    .send(InputPath {
 428                                        entry: entry.clone(),
 429                                        snapshot: snapshot.clone(),
 430                                        should_scan_tx,
 431                                    })
 432                                    .await
 433                                else {
 434                                    return;
 435                                };
 436                                if results.send(should_scan_rx).await.is_err() {
 437                                    return;
 438                                };
 439                            }
 440                        })
 441                        .await;
 442                }
 443                anyhow::Ok(())
 444            })
 445            .await;
 446        }
 447    }
 448
 449    async fn maintain_sorted_search_results(
 450        rx: Receiver<oneshot::Receiver<ProjectPath>>,
 451        paths_for_full_scan: Sender<ProjectPath>,
 452        limit: usize,
 453    ) {
 454        let mut rx = pin!(rx);
 455        let mut matched = 0;
 456        while let Some(mut next_path_result) = rx.next().await {
 457            let Some(successful_path) = next_path_result.next().await else {
 458                // This file did not produce a match, hence skip it.
 459                continue;
 460            };
 461            if paths_for_full_scan.send(successful_path).await.is_err() {
 462                return;
 463            };
 464            matched += 1;
 465            if matched >= limit {
 466                break;
 467            }
 468        }
 469    }
 470
 471    /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
 472    async fn open_buffers(
 473        buffer_store: Entity<BufferStore>,
 474        rx: Receiver<ProjectPath>,
 475        find_all_matches_tx: Sender<Entity<Buffer>>,
 476        mut cx: AsyncApp,
 477    ) {
 478        let mut rx = pin!(rx.ready_chunks(64));
 479        _ = maybe!(async move {
 480            while let Some(requested_paths) = rx.next().await {
 481                let mut buffers = buffer_store.update(&mut cx, |this, cx| {
 482                    requested_paths
 483                        .into_iter()
 484                        .map(|path| this.open_buffer(path, cx))
 485                        .collect::<FuturesOrdered<_>>()
 486                })?;
 487
 488                while let Some(buffer) = buffers.next().await {
 489                    if let Some(buffer) = buffer.log_err() {
 490                        find_all_matches_tx.send(buffer).await?;
 491                    }
 492                }
 493            }
 494            Result::<_, anyhow::Error>::Ok(())
 495        })
 496        .await;
 497    }
 498
 499    async fn grab_buffer_snapshots(
 500        rx: Receiver<Entity<Buffer>>,
 501        find_all_matches_tx: Sender<(
 502            Entity<Buffer>,
 503            BufferSnapshot,
 504            oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
 505        )>,
 506        results: Sender<oneshot::Receiver<(Entity<Buffer>, Vec<Range<language::Anchor>>)>>,
 507        mut cx: AsyncApp,
 508    ) {
 509        _ = maybe!(async move {
 510            while let Ok(buffer) = rx.recv().await {
 511                let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
 512                let (tx, rx) = oneshot::channel();
 513                find_all_matches_tx.send((buffer, snapshot, tx)).await?;
 514                results.send(rx).await?;
 515            }
 516            debug_assert!(rx.is_empty());
 517            Result::<_, anyhow::Error>::Ok(())
 518        })
 519        .await;
 520    }
 521
 522    async fn ensure_matched_ranges_are_reported_in_order(
 523        rx: Receiver<oneshot::Receiver<(Entity<Buffer>, Vec<Range<language::Anchor>>)>>,
 524        tx: Sender<SearchResult>,
 525    ) {
 526        use postage::stream::Stream;
 527        _ = maybe!(async move {
 528            let mut matched_buffers = 0;
 529            let mut matches = 0;
 530            while let Ok(mut next_buffer_matches) = rx.recv().await {
 531                let Some((buffer, ranges)) = next_buffer_matches.recv().await else {
 532                    continue;
 533                };
 534
 535                if matched_buffers > Search::MAX_SEARCH_RESULT_FILES
 536                    || matches > Search::MAX_SEARCH_RESULT_RANGES
 537                {
 538                    _ = tx.send(SearchResult::LimitReached).await;
 539                    break;
 540                }
 541                matched_buffers += 1;
 542                matches += ranges.len();
 543
 544                _ = tx.send(SearchResult::Buffer { buffer, ranges }).await?;
 545            }
 546            anyhow::Ok(())
 547        })
 548        .await;
 549    }
 550
 551    fn all_loaded_buffers(&self, search_query: &SearchQuery, cx: &App) -> Vec<Entity<Buffer>> {
 552        let worktree_store = self.worktree_store.read(cx);
 553        let mut buffers = search_query
 554            .buffers()
 555            .into_iter()
 556            .flatten()
 557            .filter(|buffer| {
 558                let b = buffer.read(cx);
 559                if let Some(file) = b.file() {
 560                    if !search_query.match_path(file.path()) {
 561                        return false;
 562                    }
 563                    if !search_query.include_ignored()
 564                        && let Some(entry) = b
 565                            .entry_id(cx)
 566                            .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx))
 567                        && entry.is_ignored
 568                    {
 569                        return false;
 570                    }
 571                }
 572                true
 573            })
 574            .cloned()
 575            .collect::<Vec<_>>();
 576        buffers.sort_by(|a, b| {
 577            let a = a.read(cx);
 578            let b = b.read(cx);
 579            match (a.file(), b.file()) {
 580                (None, None) => a.remote_id().cmp(&b.remote_id()),
 581                (None, Some(_)) => std::cmp::Ordering::Less,
 582                (Some(_), None) => std::cmp::Ordering::Greater,
 583                (Some(a), Some(b)) => compare_rel_paths((a.path(), true), (b.path(), true)),
 584            }
 585        });
 586
 587        buffers
 588    }
 589}
 590
 591struct Worker {
 592    query: Arc<SearchQuery>,
 593    open_buffers: Arc<HashSet<ProjectEntryId>>,
 594    candidates: FindSearchCandidates,
 595    /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
 596    /// Then, when you're done, share them via the channel you were given.
 597    find_all_matches_rx: Receiver<(
 598        Entity<Buffer>,
 599        BufferSnapshot,
 600        oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
 601    )>,
 602}
 603
 604impl Worker {
 605    async fn run(self) {
 606        let (
 607            input_paths_rx,
 608            confirm_contents_will_match_rx,
 609            mut confirm_contents_will_match_tx,
 610            fs,
 611        ) = match self.candidates {
 612            FindSearchCandidates::Local {
 613                fs,
 614                input_paths_rx,
 615                confirm_contents_will_match_rx,
 616                confirm_contents_will_match_tx,
 617            } => (
 618                input_paths_rx,
 619                confirm_contents_will_match_rx,
 620                confirm_contents_will_match_tx,
 621                Some(fs),
 622            ),
 623            FindSearchCandidates::Remote | FindSearchCandidates::OpenBuffersOnly => {
 624                (unbounded().1, unbounded().1, unbounded().0, None)
 625            }
 626        };
 627        // WorkerA: grabs a request for "find all matches in file/a" <- takes 5 minutes
 628        // right after: WorkerB: grabs a request for "find all matches in file/b" <- takes 5 seconds
 629        let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
 630        let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse());
 631        let mut scan_path = pin!(input_paths_rx.fuse());
 632
 633        loop {
 634            let handler = RequestHandler {
 635                query: &self.query,
 636                open_entries: &self.open_buffers,
 637                fs: fs.as_deref(),
 638                confirm_contents_will_match_tx: &confirm_contents_will_match_tx,
 639            };
 640            // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
 641            // steps straight away. Another worker might be about to produce a value that will
 642            // be pushed there, thus we'll replace current worker's pipe with a dummy one.
 643            // That way, we'll only ever close a next-stage channel when ALL workers do so.
 644            select_biased! {
 645                find_all_matches = find_all_matches.next() => {
 646                    let Some(matches) = find_all_matches else {
 647                        continue;
 648                    };
 649                    handler.handle_find_all_matches(matches).await;
 650                },
 651                find_first_match = find_first_match.next() => {
 652                    if let Some(buffer_with_at_least_one_match) = find_first_match {
 653                        handler.handle_find_first_match(buffer_with_at_least_one_match).await;
 654                    }
 655                },
 656                scan_path = scan_path.next() => {
 657                    if let Some(path_to_scan) = scan_path {
 658                        handler.handle_scan_path(path_to_scan).await;
 659                    } else {
 660                        // If we're the last worker to notice that this is not producing values, close the upstream.
 661                        confirm_contents_will_match_tx = bounded(1).0;
 662                    }
 663
 664                 }
 665                 complete => {
 666                     break
 667                },
 668
 669            }
 670        }
 671    }
 672}
 673
 674struct RequestHandler<'worker> {
 675    query: &'worker SearchQuery,
 676    fs: Option<&'worker dyn Fs>,
 677    open_entries: &'worker HashSet<ProjectEntryId>,
 678    confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
 679}
 680
 681impl RequestHandler<'_> {
 682    async fn handle_find_all_matches(
 683        &self,
 684        (buffer, snapshot, mut report_matches): (
 685            Entity<Buffer>,
 686            BufferSnapshot,
 687            oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
 688        ),
 689    ) {
 690        let ranges = self
 691            .query
 692            .search(&snapshot, None)
 693            .await
 694            .iter()
 695            .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
 696            .collect::<Vec<_>>();
 697
 698        _ = report_matches.send((buffer, ranges)).await;
 699    }
 700
 701    async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
 702        _=maybe!(async move {
 703            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
 704            let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else {
 705                return anyhow::Ok(());
 706            };
 707
 708            let mut file = BufReader::new(file);
 709            let file_start = file.fill_buf()?;
 710
 711            if let Err(Some(starting_position)) =
 712            std::str::from_utf8(file_start).map_err(|e| e.error_len())
 713            {
 714                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
 715                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
 716                log::debug!(
 717                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
 718                );
 719                return Ok(());
 720            }
 721
 722            if self.query.detect(file).unwrap_or(false) {
 723                // Yes, we should scan the whole file.
 724                entry.should_scan_tx.send(entry.path).await?;
 725            }
 726            Ok(())
 727        }).await;
 728    }
 729
 730    async fn handle_scan_path(&self, req: InputPath) {
 731        _ = maybe!(async move {
 732            let InputPath {
 733                entry,
 734                snapshot,
 735                mut should_scan_tx,
 736            } = req;
 737
 738            if entry.is_fifo || !entry.is_file() {
 739                return Ok(());
 740            }
 741
 742            if self.query.filters_path() {
 743                let matched_path = if self.query.match_full_paths() {
 744                    let mut full_path = snapshot.root_name().to_owned();
 745                    full_path.push(&entry.path);
 746                    self.query.match_path(&full_path)
 747                } else {
 748                    self.query.match_path(&entry.path)
 749                };
 750                if !matched_path {
 751                    return Ok(());
 752                }
 753            }
 754
 755            if self.open_entries.contains(&entry.id) {
 756                // The buffer is already in memory and that's the version we want to scan;
 757                // hence skip the dilly-dally and look for all matches straight away.
 758                should_scan_tx
 759                    .send(ProjectPath {
 760                        worktree_id: snapshot.id(),
 761                        path: entry.path.clone(),
 762                    })
 763                    .await?;
 764            } else {
 765                self.confirm_contents_will_match_tx
 766                    .send(MatchingEntry {
 767                        should_scan_tx: should_scan_tx,
 768                        worktree_root: snapshot.abs_path().clone(),
 769                        path: ProjectPath {
 770                            worktree_id: snapshot.id(),
 771                            path: entry.path.clone(),
 772                        },
 773                    })
 774                    .await?;
 775            }
 776
 777            anyhow::Ok(())
 778        })
 779        .await;
 780    }
 781}
 782
 783struct InputPath {
 784    entry: Entry,
 785    snapshot: Snapshot,
 786    should_scan_tx: oneshot::Sender<ProjectPath>,
 787}
 788
 789struct MatchingEntry {
 790    worktree_root: Arc<Path>,
 791    path: ProjectPath,
 792    should_scan_tx: oneshot::Sender<ProjectPath>,
 793}
 794
 795/// This struct encapsulates the logic to decide whether a given gitignored directory should be
 796/// scanned based on include/exclude patterns of a search query (as include/exclude parameters may match paths inside it).
 797/// It is kind-of doing an inverse of glob. Given a glob pattern like `src/**/` and a parent path like `src`, we need to decide whether the parent
 798/// may contain glob hits.
 799struct PathInclusionMatcher {
 800    included: BTreeSet<PathBuf>,
 801    query: Arc<SearchQuery>,
 802}
 803
 804impl PathInclusionMatcher {
 805    fn new(query: Arc<SearchQuery>) -> Self {
 806        let mut included = BTreeSet::new();
 807        // To do an inverse glob match, we split each glob into it's prefix and the glob part.
 808        // For example, `src/**/*.rs` becomes `src/` and `**/*.rs`. The glob part gets dropped.
 809        // Then, when checking whether a given directory should be scanned, we check whether it is a non-empty substring of any glob prefix.
 810        if query.filters_path() {
 811            included.extend(
 812                query
 813                    .files_to_include()
 814                    .sources()
 815                    .flat_map(|glob| Some(wax::Glob::new(glob).ok()?.partition().0)),
 816            );
 817        }
 818        Self { included, query }
 819    }
 820
 821    fn should_scan_gitignored_dir(
 822        &self,
 823        entry: &Entry,
 824        snapshot: &Snapshot,
 825        worktree_settings: &WorktreeSettings,
 826    ) -> bool {
 827        if !entry.is_ignored || !entry.kind.is_unloaded() {
 828            return false;
 829        }
 830        if !self.query.include_ignored() {
 831            return false;
 832        }
 833        if worktree_settings.is_path_excluded(&entry.path) {
 834            return false;
 835        }
 836        if !self.query.filters_path() {
 837            return true;
 838        }
 839
 840        let as_abs_path = LazyCell::new(move || snapshot.absolutize(&entry.path));
 841        let entry_path = &entry.path;
 842        // 3. Check Exclusions (Pruning)
 843        // If the current path is a child of an excluded path, we stop.
 844        let is_excluded = self.path_is_definitely_excluded(&entry_path, snapshot);
 845
 846        if is_excluded {
 847            return false;
 848        }
 849
 850        // 4. Check Inclusions (Traversal)
 851        if self.included.is_empty() {
 852            return true;
 853        }
 854
 855        // We scan if the current path is a descendant of an include prefix
 856        // OR if the current path is an ancestor of an include prefix (we need to go deeper to find it).
 857        let is_included = self.included.iter().any(|prefix| {
 858            let (prefix_matches_entry, entry_matches_prefix) = if prefix.is_absolute() {
 859                (
 860                    prefix.starts_with(&**as_abs_path),
 861                    as_abs_path.starts_with(prefix),
 862                )
 863            } else {
 864                RelPath::new(prefix, snapshot.path_style()).map_or((false, false), |prefix| {
 865                    (
 866                        prefix.starts_with(entry_path),
 867                        entry_path.starts_with(&prefix),
 868                    )
 869                })
 870            };
 871
 872            // Logic:
 873            // 1. entry_matches_prefix: We are inside the target zone (e.g. glob: src/, current: src/lib/). Keep scanning.
 874            // 2. prefix_matches_entry: We are above the target zone (e.g. glob: src/foo/, current: src/). Keep scanning to reach foo.
 875            prefix_matches_entry || entry_matches_prefix
 876        });
 877
 878        is_included
 879    }
 880    fn path_is_definitely_excluded(&self, path: &RelPath, snapshot: &Snapshot) -> bool {
 881        if !self.query.files_to_exclude().sources().next().is_none() {
 882            let mut path = if self.query.match_full_paths() {
 883                let mut full_path = snapshot.root_name().to_owned();
 884                full_path.push(path);
 885                full_path
 886            } else {
 887                path.to_owned()
 888            };
 889            loop {
 890                if self.query.files_to_exclude().is_match(&path) {
 891                    return true;
 892                } else if !path.pop() {
 893                    return false;
 894                }
 895            }
 896        } else {
 897            false
 898        }
 899    }
 900}
 901
 902#[cfg(test)]
 903mod tests {
 904    use super::*;
 905    use fs::FakeFs;
 906    use serde_json::json;
 907    use settings::Settings;
 908    use util::{
 909        path,
 910        paths::{PathMatcher, PathStyle},
 911        rel_path::RelPath,
 912    };
 913    use worktree::{Entry, EntryKind, WorktreeSettings};
 914
 915    use crate::{
 916        Project, project_search::PathInclusionMatcher, project_tests::init_test,
 917        search::SearchQuery,
 918    };
 919
 920    #[gpui::test]
 921    async fn test_path_inclusion_matcher(cx: &mut gpui::TestAppContext) {
 922        init_test(cx);
 923
 924        let fs = FakeFs::new(cx.background_executor.clone());
 925        fs.insert_tree(
 926            "/root",
 927            json!({
 928                ".gitignore": "src/data/\n",
 929                "src": {
 930                    "data": {
 931                        "main.csv": "field_1,field_2,field_3",
 932                    },
 933                    "lib": {
 934                        "main.txt": "Are you familiar with fields?",
 935                    },
 936                },
 937            }),
 938        )
 939        .await;
 940
 941        let project = Project::test(fs.clone(), [path!("/root").as_ref()], cx).await;
 942        let worktree = project.update(cx, |project, cx| project.worktrees(cx).next().unwrap());
 943        let (worktree_settings, worktree_snapshot) = worktree.update(cx, |worktree, cx| {
 944            let settings_location = worktree.settings_location(cx);
 945            return (
 946                WorktreeSettings::get(Some(settings_location), cx).clone(),
 947                worktree.snapshot(),
 948            );
 949        });
 950
 951        // Manually create a test entry for the gitignored directory since it won't
 952        // be loaded by the worktree
 953        let entry = Entry {
 954            id: ProjectEntryId::from_proto(1),
 955            kind: EntryKind::UnloadedDir,
 956            path: Arc::from(RelPath::unix(Path::new("src/data")).unwrap()),
 957            inode: 0,
 958            mtime: None,
 959            canonical_path: None,
 960            is_ignored: true,
 961            is_hidden: false,
 962            is_always_included: false,
 963            is_external: false,
 964            is_private: false,
 965            size: 0,
 966            char_bag: Default::default(),
 967            is_fifo: false,
 968        };
 969
 970        // 1. Test searching for `field`, including ignored files without any
 971        // inclusion and exclusion filters.
 972        let include_ignored = true;
 973        let files_to_include = PathMatcher::default();
 974        let files_to_exclude = PathMatcher::default();
 975        let match_full_paths = false;
 976        let search_query = SearchQuery::text(
 977            "field",
 978            false,
 979            false,
 980            include_ignored,
 981            files_to_include,
 982            files_to_exclude,
 983            match_full_paths,
 984            None,
 985        )
 986        .unwrap();
 987
 988        let path_matcher = PathInclusionMatcher::new(Arc::new(search_query));
 989        assert!(path_matcher.should_scan_gitignored_dir(
 990            &entry,
 991            &worktree_snapshot,
 992            &worktree_settings
 993        ));
 994
 995        // 2. Test searching for `field`, including ignored files but updating
 996        // `files_to_include` to only include files under `src/lib`.
 997        let include_ignored = true;
 998        let files_to_include = PathMatcher::new(vec!["src/lib"], PathStyle::Posix).unwrap();
 999        let files_to_exclude = PathMatcher::default();
1000        let match_full_paths = false;
1001        let search_query = SearchQuery::text(
1002            "field",
1003            false,
1004            false,
1005            include_ignored,
1006            files_to_include,
1007            files_to_exclude,
1008            match_full_paths,
1009            None,
1010        )
1011        .unwrap();
1012
1013        let path_matcher = PathInclusionMatcher::new(Arc::new(search_query));
1014        assert!(!path_matcher.should_scan_gitignored_dir(
1015            &entry,
1016            &worktree_snapshot,
1017            &worktree_settings
1018        ));
1019    }
1020}