project_search.rs

   1use std::{
   2    cell::LazyCell,
   3    collections::BTreeSet,
   4    io::{BufRead, BufReader},
   5    ops::Range,
   6    path::{Path, PathBuf},
   7    pin::pin,
   8    sync::Arc,
   9};
  10
  11use anyhow::Context;
  12use collections::HashSet;
  13use fs::Fs;
  14use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
  15use gpui::{App, AppContext, AsyncApp, Entity, Task};
  16use language::{Buffer, BufferSnapshot};
  17use parking_lot::Mutex;
  18use postage::oneshot;
  19use rpc::{AnyProtoClient, proto};
  20use smol::{
  21    channel::{Receiver, Sender, bounded, unbounded},
  22    future::FutureExt,
  23};
  24
  25use text::BufferId;
  26use util::{ResultExt, maybe, paths::compare_rel_paths, rel_path::RelPath};
  27use worktree::{Entry, ProjectEntryId, Snapshot, Worktree, WorktreeSettings};
  28
  29use crate::{
  30    Project, ProjectItem, ProjectPath, RemotelyCreatedModels,
  31    buffer_store::BufferStore,
  32    search::{SearchQuery, SearchResult},
  33    worktree_store::WorktreeStore,
  34};
  35
  36pub struct Search {
  37    buffer_store: Entity<BufferStore>,
  38    worktree_store: Entity<WorktreeStore>,
  39    limit: usize,
  40    kind: SearchKind,
  41}
  42
  43/// Represents search setup, before it is actually kicked off with Search::into_results
  44enum SearchKind {
  45    /// Search for candidates by inspecting file contents on file system, avoiding loading the buffer unless we know that a given file contains a match.
  46    Local {
  47        fs: Arc<dyn Fs>,
  48        worktrees: Vec<Entity<Worktree>>,
  49    },
  50    /// Query remote host for candidates. As of writing, the host runs a local search in "buffers with matches only" mode.
  51    Remote {
  52        client: AnyProtoClient,
  53        remote_id: u64,
  54        models: Arc<Mutex<RemotelyCreatedModels>>,
  55    },
  56    /// Run search against a known set of candidates. Even when working with a remote host, this won't round-trip to host.
  57    OpenBuffersOnly,
  58}
  59
  60/// Represents results of project search and allows one to either obtain match positions OR
  61/// just the handles to buffers that may match the search. Grabbing the handles is cheaper than obtaining full match positions, because in that case we'll look for
  62/// at most one match in each file.
  63#[must_use]
  64pub struct SearchResultsHandle {
  65    results: Receiver<SearchResult>,
  66    matching_buffers: Receiver<Entity<Buffer>>,
  67    trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
  68}
  69
  70impl SearchResultsHandle {
  71    pub fn results(self, cx: &mut App) -> Receiver<SearchResult> {
  72        (self.trigger_search)(cx).detach();
  73        self.results
  74    }
  75    pub fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
  76        (self.trigger_search)(cx).detach();
  77        self.matching_buffers
  78    }
  79}
  80
  81#[derive(Clone)]
  82enum FindSearchCandidates {
  83    Local {
  84        fs: Arc<dyn Fs>,
  85        /// Start off with all paths in project and filter them based on:
  86        /// - Include filters
  87        /// - Exclude filters
  88        /// - Only open buffers
  89        /// - Scan ignored files
  90        /// Put another way: filter out files that can't match (without looking at file contents)
  91        input_paths_rx: Receiver<InputPath>,
  92        /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
  93        /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
  94        confirm_contents_will_match_tx: Sender<MatchingEntry>,
  95        confirm_contents_will_match_rx: Receiver<MatchingEntry>,
  96    },
  97    Remote,
  98    OpenBuffersOnly,
  99}
 100
 101impl Search {
 102    pub fn local(
 103        fs: Arc<dyn Fs>,
 104        buffer_store: Entity<BufferStore>,
 105        worktree_store: Entity<WorktreeStore>,
 106        limit: usize,
 107        cx: &mut App,
 108    ) -> Self {
 109        let worktrees = worktree_store.read(cx).visible_worktrees(cx).collect();
 110        Self {
 111            kind: SearchKind::Local { fs, worktrees },
 112            buffer_store,
 113            worktree_store,
 114            limit,
 115        }
 116    }
 117
 118    pub(crate) fn remote(
 119        buffer_store: Entity<BufferStore>,
 120        worktree_store: Entity<WorktreeStore>,
 121        limit: usize,
 122        client_state: (AnyProtoClient, u64, Arc<Mutex<RemotelyCreatedModels>>),
 123    ) -> Self {
 124        Self {
 125            kind: SearchKind::Remote {
 126                client: client_state.0,
 127                remote_id: client_state.1,
 128                models: client_state.2,
 129            },
 130            buffer_store,
 131            worktree_store,
 132            limit,
 133        }
 134    }
 135    pub(crate) fn open_buffers_only(
 136        buffer_store: Entity<BufferStore>,
 137        worktree_store: Entity<WorktreeStore>,
 138        limit: usize,
 139    ) -> Self {
 140        Self {
 141            kind: SearchKind::OpenBuffersOnly,
 142            buffer_store,
 143            worktree_store,
 144            limit,
 145        }
 146    }
 147
 148    pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000;
 149    pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
 150    /// Prepares a project search run. The resulting [`SearchResultsHandle`] has to be used to specify whether you're interested in matching buffers
 151    /// or full search results.
 152    pub fn into_handle(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
 153        let mut open_buffers = HashSet::default();
 154        let mut unnamed_buffers = Vec::new();
 155        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
 156        let buffers = self.buffer_store.read(cx);
 157        for handle in buffers.buffers() {
 158            let buffer = handle.read(cx);
 159            if !buffers.is_searchable(&buffer.remote_id()) {
 160                continue;
 161            } else if let Some(entry_id) = buffer.entry_id(cx) {
 162                open_buffers.insert(entry_id);
 163            } else {
 164                self.limit = self.limit.saturating_sub(1);
 165                unnamed_buffers.push(handle)
 166            };
 167        }
 168        let executor = cx.background_executor().clone();
 169        let (tx, rx) = unbounded();
 170        let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
 171        let matching_buffers = grab_buffer_snapshot_rx.clone();
 172        let trigger_search = Box::new(move |cx: &mut App| {
 173            cx.spawn(async move |cx| {
 174                for buffer in unnamed_buffers {
 175                    _ = grab_buffer_snapshot_tx.send(buffer).await;
 176                }
 177
 178                let (find_all_matches_tx, find_all_matches_rx) =
 179                    bounded(MAX_CONCURRENT_BUFFER_OPENS);
 180                let query = Arc::new(query);
 181                let (candidate_searcher, tasks) = match self.kind {
 182                    SearchKind::OpenBuffersOnly => {
 183                        let Ok(open_buffers) = cx.update(|cx| self.all_loaded_buffers(&query, cx))
 184                        else {
 185                            return;
 186                        };
 187                        let fill_requests = cx
 188                            .background_spawn(async move {
 189                                for buffer in open_buffers {
 190                                    if let Err(_) = grab_buffer_snapshot_tx.send(buffer).await {
 191                                        return;
 192                                    }
 193                                }
 194                            })
 195                            .boxed_local();
 196                        (FindSearchCandidates::OpenBuffersOnly, vec![fill_requests])
 197                    }
 198                    SearchKind::Local {
 199                        fs,
 200                        ref mut worktrees,
 201                    } => {
 202                        let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
 203                            unbounded();
 204                        let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
 205                            bounded(64);
 206                        let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
 207
 208                        let (input_paths_tx, input_paths_rx) = unbounded();
 209                        let tasks = vec![
 210                            cx.spawn(Self::provide_search_paths(
 211                                std::mem::take(worktrees),
 212                                query.clone(),
 213                                input_paths_tx,
 214                                sorted_search_results_tx,
 215                            ))
 216                            .boxed_local(),
 217                            Self::open_buffers(
 218                                &self.buffer_store,
 219                                get_buffer_for_full_scan_rx,
 220                                grab_buffer_snapshot_tx,
 221                                cx.clone(),
 222                            )
 223                            .boxed_local(),
 224                            cx.background_spawn(Self::maintain_sorted_search_results(
 225                                sorted_search_results_rx,
 226                                get_buffer_for_full_scan_tx,
 227                                self.limit,
 228                            ))
 229                            .boxed_local(),
 230                        ];
 231                        (
 232                            FindSearchCandidates::Local {
 233                                fs,
 234                                confirm_contents_will_match_tx,
 235                                confirm_contents_will_match_rx,
 236                                input_paths_rx,
 237                            },
 238                            tasks,
 239                        )
 240                    }
 241                    SearchKind::Remote {
 242                        client,
 243                        remote_id,
 244                        models,
 245                    } => {
 246                        let request = client.request(proto::FindSearchCandidates {
 247                            project_id: remote_id,
 248                            query: Some(query.to_proto()),
 249                            limit: self.limit as _,
 250                        });
 251                        let Ok(guard) = cx.update(|cx| {
 252                            Project::retain_remotely_created_models_impl(
 253                                &models,
 254                                &self.buffer_store,
 255                                &self.worktree_store,
 256                                cx,
 257                            )
 258                        }) else {
 259                            return;
 260                        };
 261                        let buffer_store = self.buffer_store.downgrade();
 262                        let issue_remote_buffers_request = cx
 263                            .spawn(async move |cx| {
 264                                let _ = maybe!(async move {
 265                                    let response = request.await?;
 266                                    for buffer_id in response.buffer_ids {
 267                                        let buffer_id = BufferId::new(buffer_id)?;
 268                                        let buffer = buffer_store
 269                                            .update(cx, |buffer_store, cx| {
 270                                                buffer_store.wait_for_remote_buffer(buffer_id, cx)
 271                                            })?
 272                                            .await?;
 273                                        let _ = grab_buffer_snapshot_tx.send(buffer).await;
 274                                    }
 275
 276                                    drop(guard);
 277                                    anyhow::Ok(())
 278                                })
 279                                .await
 280                                .log_err();
 281                            })
 282                            .boxed_local();
 283                        (
 284                            FindSearchCandidates::Remote,
 285                            vec![issue_remote_buffers_request],
 286                        )
 287                    }
 288                };
 289
 290                let should_find_all_matches = !tx.is_closed();
 291
 292                let worker_pool = executor.scoped(|scope| {
 293                    let num_cpus = executor.num_cpus();
 294
 295                    assert!(num_cpus > 0);
 296                    for _ in 0..executor.num_cpus() - 1 {
 297                        let worker = Worker {
 298                            query: &query,
 299                            open_buffers: &open_buffers,
 300                            candidates: candidate_searcher.clone(),
 301                            find_all_matches_rx: find_all_matches_rx.clone(),
 302                        };
 303                        scope.spawn(worker.run());
 304                    }
 305
 306                    drop(find_all_matches_rx);
 307                    drop(candidate_searcher);
 308                });
 309
 310                let (sorted_matches_tx, sorted_matches_rx) = unbounded();
 311                // The caller of `into_handle` decides whether they're interested in all matches (files that matched + all matching ranges) or
 312                // just the files. *They are using the same stream as the guts of the project search do*.
 313                // This means that we cannot grab values off of that stream unless it's strictly needed for making a progress in project search.
 314                //
 315                // Grabbing buffer snapshots is only necessary when we're looking for all matches. If the caller decided that they're not interested
 316                // in all matches, running that task unconditionally would hinder caller's ability to observe all matching file paths.
 317                let buffer_snapshots = if should_find_all_matches {
 318                    Some(
 319                        Self::grab_buffer_snapshots(
 320                            grab_buffer_snapshot_rx,
 321                            find_all_matches_tx,
 322                            sorted_matches_tx,
 323                            cx.clone(),
 324                        )
 325                        .boxed_local(),
 326                    )
 327                } else {
 328                    drop(find_all_matches_tx);
 329
 330                    None
 331                };
 332                let ensure_matches_are_reported_in_order = if should_find_all_matches {
 333                    Some(
 334                        Self::ensure_matched_ranges_are_reported_in_order(sorted_matches_rx, tx)
 335                            .boxed_local(),
 336                    )
 337                } else {
 338                    drop(tx);
 339                    None
 340                };
 341
 342                futures::future::join_all(
 343                    [worker_pool.boxed_local()]
 344                        .into_iter()
 345                        .chain(buffer_snapshots)
 346                        .chain(ensure_matches_are_reported_in_order)
 347                        .chain(tasks),
 348                )
 349                .await;
 350            })
 351        });
 352
 353        SearchResultsHandle {
 354            results: rx,
 355            matching_buffers,
 356            trigger_search,
 357        }
 358    }
 359
 360    fn provide_search_paths(
 361        worktrees: Vec<Entity<Worktree>>,
 362        query: Arc<SearchQuery>,
 363        tx: Sender<InputPath>,
 364        results: Sender<oneshot::Receiver<ProjectPath>>,
 365    ) -> impl AsyncFnOnce(&mut AsyncApp) {
 366        async move |cx| {
 367            _ = maybe!(async move {
 368                let gitignored_tracker = PathInclusionMatcher::new(query.clone());
 369                for worktree in worktrees {
 370                    let (mut snapshot, worktree_settings) = worktree
 371                        .read_with(cx, |this, _| {
 372                            Some((this.snapshot(), this.as_local()?.settings()))
 373                        })?
 374                        .context("The worktree is not local")?;
 375                    if query.include_ignored() {
 376                        // Pre-fetch all of the ignored directories as they're going to be searched.
 377                        let mut entries_to_refresh = vec![];
 378
 379                        for entry in snapshot.entries(query.include_ignored(), 0) {
 380                            if gitignored_tracker.should_scan_gitignored_dir(
 381                                entry,
 382                                &snapshot,
 383                                &worktree_settings,
 384                            ) {
 385                                entries_to_refresh.push(entry.path.clone());
 386                            }
 387                        }
 388                        let barrier = worktree.update(cx, |this, _| {
 389                            let local = this.as_local_mut()?;
 390                            let barrier = entries_to_refresh
 391                                .into_iter()
 392                                .map(|path| local.add_path_prefix_to_scan(path).into_future())
 393                                .collect::<Vec<_>>();
 394                            Some(barrier)
 395                        })?;
 396                        if let Some(barriers) = barrier {
 397                            futures::future::join_all(barriers).await;
 398                        }
 399                        snapshot = worktree.read_with(cx, |this, _| this.snapshot())?;
 400                    }
 401                    cx.background_executor()
 402                        .scoped(|scope| {
 403                            scope.spawn(async {
 404                                for entry in snapshot.files(query.include_ignored(), 0) {
 405                                    let (should_scan_tx, should_scan_rx) = oneshot::channel();
 406
 407                                    let Ok(_) = tx
 408                                        .send(InputPath {
 409                                            entry: entry.clone(),
 410                                            snapshot: snapshot.clone(),
 411                                            should_scan_tx,
 412                                        })
 413                                        .await
 414                                    else {
 415                                        return;
 416                                    };
 417                                    if results.send(should_scan_rx).await.is_err() {
 418                                        return;
 419                                    };
 420                                }
 421                            })
 422                        })
 423                        .await;
 424                }
 425                anyhow::Ok(())
 426            })
 427            .await;
 428        }
 429    }
 430
 431    async fn maintain_sorted_search_results(
 432        rx: Receiver<oneshot::Receiver<ProjectPath>>,
 433        paths_for_full_scan: Sender<ProjectPath>,
 434        limit: usize,
 435    ) {
 436        let mut rx = pin!(rx);
 437        let mut matched = 0;
 438        while let Some(mut next_path_result) = rx.next().await {
 439            let Some(successful_path) = next_path_result.next().await else {
 440                // This file did not produce a match, hence skip it.
 441                continue;
 442            };
 443            if paths_for_full_scan.send(successful_path).await.is_err() {
 444                return;
 445            };
 446            matched += 1;
 447            if matched >= limit {
 448                break;
 449            }
 450        }
 451    }
 452
 453    /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
 454    async fn open_buffers(
 455        buffer_store: &Entity<BufferStore>,
 456        rx: Receiver<ProjectPath>,
 457        find_all_matches_tx: Sender<Entity<Buffer>>,
 458        mut cx: AsyncApp,
 459    ) {
 460        let mut rx = pin!(rx.ready_chunks(64));
 461        _ = maybe!(async move {
 462            while let Some(requested_paths) = rx.next().await {
 463                let mut buffers = buffer_store.update(&mut cx, |this, cx| {
 464                    requested_paths
 465                        .into_iter()
 466                        .map(|path| this.open_buffer(path, cx))
 467                        .collect::<FuturesOrdered<_>>()
 468                })?;
 469
 470                while let Some(buffer) = buffers.next().await {
 471                    if let Some(buffer) = buffer.log_err() {
 472                        find_all_matches_tx.send(buffer).await?;
 473                    }
 474                }
 475            }
 476            Result::<_, anyhow::Error>::Ok(())
 477        })
 478        .await;
 479    }
 480
 481    async fn grab_buffer_snapshots(
 482        rx: Receiver<Entity<Buffer>>,
 483        find_all_matches_tx: Sender<(
 484            Entity<Buffer>,
 485            BufferSnapshot,
 486            oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
 487        )>,
 488        results: Sender<oneshot::Receiver<(Entity<Buffer>, Vec<Range<language::Anchor>>)>>,
 489        mut cx: AsyncApp,
 490    ) {
 491        _ = maybe!(async move {
 492            while let Ok(buffer) = rx.recv().await {
 493                let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
 494                let (tx, rx) = oneshot::channel();
 495                find_all_matches_tx.send((buffer, snapshot, tx)).await?;
 496                results.send(rx).await?;
 497            }
 498            debug_assert!(rx.is_empty());
 499            Result::<_, anyhow::Error>::Ok(())
 500        })
 501        .await;
 502    }
 503
 504    async fn ensure_matched_ranges_are_reported_in_order(
 505        rx: Receiver<oneshot::Receiver<(Entity<Buffer>, Vec<Range<language::Anchor>>)>>,
 506        tx: Sender<SearchResult>,
 507    ) {
 508        use postage::stream::Stream;
 509        _ = maybe!(async move {
 510            let mut matched_buffers = 0;
 511            let mut matches = 0;
 512            while let Ok(mut next_buffer_matches) = rx.recv().await {
 513                let Some((buffer, ranges)) = next_buffer_matches.recv().await else {
 514                    continue;
 515                };
 516
 517                if matched_buffers > Search::MAX_SEARCH_RESULT_FILES
 518                    || matches > Search::MAX_SEARCH_RESULT_RANGES
 519                {
 520                    _ = tx.send(SearchResult::LimitReached).await;
 521                    break;
 522                }
 523                matched_buffers += 1;
 524                matches += ranges.len();
 525
 526                _ = tx.send(SearchResult::Buffer { buffer, ranges }).await?;
 527            }
 528            anyhow::Ok(())
 529        })
 530        .await;
 531    }
 532
 533    fn all_loaded_buffers(&self, search_query: &SearchQuery, cx: &App) -> Vec<Entity<Buffer>> {
 534        let worktree_store = self.worktree_store.read(cx);
 535        let mut buffers = search_query
 536            .buffers()
 537            .into_iter()
 538            .flatten()
 539            .filter(|buffer| {
 540                let b = buffer.read(cx);
 541                if let Some(file) = b.file() {
 542                    if !search_query.match_path(file.path()) {
 543                        return false;
 544                    }
 545                    if !search_query.include_ignored()
 546                        && let Some(entry) = b
 547                            .entry_id(cx)
 548                            .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx))
 549                        && entry.is_ignored
 550                    {
 551                        return false;
 552                    }
 553                }
 554                true
 555            })
 556            .cloned()
 557            .collect::<Vec<_>>();
 558        buffers.sort_by(|a, b| {
 559            let a = a.read(cx);
 560            let b = b.read(cx);
 561            match (a.file(), b.file()) {
 562                (None, None) => a.remote_id().cmp(&b.remote_id()),
 563                (None, Some(_)) => std::cmp::Ordering::Less,
 564                (Some(_), None) => std::cmp::Ordering::Greater,
 565                (Some(a), Some(b)) => compare_rel_paths((a.path(), true), (b.path(), true)),
 566            }
 567        });
 568
 569        buffers
 570    }
 571}
 572
 573struct Worker<'search> {
 574    query: &'search SearchQuery,
 575    open_buffers: &'search HashSet<ProjectEntryId>,
 576    candidates: FindSearchCandidates,
 577    /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
 578    /// Then, when you're done, share them via the channel you were given.
 579    find_all_matches_rx: Receiver<(
 580        Entity<Buffer>,
 581        BufferSnapshot,
 582        oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
 583    )>,
 584}
 585
 586impl Worker<'_> {
 587    async fn run(self) {
 588        let (
 589            input_paths_rx,
 590            confirm_contents_will_match_rx,
 591            mut confirm_contents_will_match_tx,
 592            fs,
 593        ) = match self.candidates {
 594            FindSearchCandidates::Local {
 595                fs,
 596                input_paths_rx,
 597                confirm_contents_will_match_rx,
 598                confirm_contents_will_match_tx,
 599            } => (
 600                input_paths_rx,
 601                confirm_contents_will_match_rx,
 602                confirm_contents_will_match_tx,
 603                Some(fs),
 604            ),
 605            FindSearchCandidates::Remote | FindSearchCandidates::OpenBuffersOnly => {
 606                (unbounded().1, unbounded().1, unbounded().0, None)
 607            }
 608        };
 609        // WorkerA: grabs a request for "find all matches in file/a" <- takes 5 minutes
 610        // right after: WorkerB: grabs a request for "find all matches in file/b" <- takes 5 seconds
 611        let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
 612        let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse());
 613        let mut scan_path = pin!(input_paths_rx.fuse());
 614
 615        loop {
 616            let handler = RequestHandler {
 617                query: self.query,
 618                open_entries: &self.open_buffers,
 619                fs: fs.as_deref(),
 620                confirm_contents_will_match_tx: &confirm_contents_will_match_tx,
 621            };
 622            // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
 623            // steps straight away. Another worker might be about to produce a value that will
 624            // be pushed there, thus we'll replace current worker's pipe with a dummy one.
 625            // That way, we'll only ever close a next-stage channel when ALL workers do so.
 626            select_biased! {
 627                find_all_matches = find_all_matches.next() => {
 628                    let Some(matches) = find_all_matches else {
 629                        continue;
 630                    };
 631                    handler.handle_find_all_matches(matches).await;
 632                },
 633                find_first_match = find_first_match.next() => {
 634                    if let Some(buffer_with_at_least_one_match) = find_first_match {
 635                        handler.handle_find_first_match(buffer_with_at_least_one_match).await;
 636                    }
 637                },
 638                scan_path = scan_path.next() => {
 639                    if let Some(path_to_scan) = scan_path {
 640                        handler.handle_scan_path(path_to_scan).await;
 641                    } else {
 642                        // If we're the last worker to notice that this is not producing values, close the upstream.
 643                        confirm_contents_will_match_tx = bounded(1).0;
 644                    }
 645
 646                 }
 647                 complete => {
 648                     break
 649                },
 650
 651            }
 652        }
 653    }
 654}
 655
 656struct RequestHandler<'worker> {
 657    query: &'worker SearchQuery,
 658    fs: Option<&'worker dyn Fs>,
 659    open_entries: &'worker HashSet<ProjectEntryId>,
 660    confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
 661}
 662
 663impl RequestHandler<'_> {
 664    async fn handle_find_all_matches(
 665        &self,
 666        (buffer, snapshot, mut report_matches): (
 667            Entity<Buffer>,
 668            BufferSnapshot,
 669            oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
 670        ),
 671    ) {
 672        let ranges = self
 673            .query
 674            .search(&snapshot, None)
 675            .await
 676            .iter()
 677            .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
 678            .collect::<Vec<_>>();
 679
 680        _ = report_matches.send((buffer, ranges)).await;
 681    }
 682
 683    async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
 684        _=maybe!(async move {
 685            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
 686            let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else {
 687                return anyhow::Ok(());
 688            };
 689
 690            let mut file = BufReader::new(file);
 691            let file_start = file.fill_buf()?;
 692
 693            if let Err(Some(starting_position)) =
 694            std::str::from_utf8(file_start).map_err(|e| e.error_len())
 695            {
 696                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
 697                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
 698                log::debug!(
 699                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
 700                );
 701                return Ok(());
 702            }
 703
 704            if self.query.detect(file).unwrap_or(false) {
 705                // Yes, we should scan the whole file.
 706                entry.should_scan_tx.send(entry.path).await?;
 707            }
 708            Ok(())
 709        }).await;
 710    }
 711
 712    async fn handle_scan_path(&self, req: InputPath) {
 713        _ = maybe!(async move {
 714            let InputPath {
 715                entry,
 716                snapshot,
 717                mut should_scan_tx,
 718            } = req;
 719
 720            if entry.is_fifo || !entry.is_file() {
 721                return Ok(());
 722            }
 723
 724            if self.query.filters_path() {
 725                let matched_path = if self.query.match_full_paths() {
 726                    let mut full_path = snapshot.root_name().to_owned();
 727                    full_path.push(&entry.path);
 728                    self.query.match_path(&full_path)
 729                } else {
 730                    self.query.match_path(&entry.path)
 731                };
 732                if !matched_path {
 733                    return Ok(());
 734                }
 735            }
 736
 737            if self.open_entries.contains(&entry.id) {
 738                // The buffer is already in memory and that's the version we want to scan;
 739                // hence skip the dilly-dally and look for all matches straight away.
 740                should_scan_tx
 741                    .send(ProjectPath {
 742                        worktree_id: snapshot.id(),
 743                        path: entry.path.clone(),
 744                    })
 745                    .await?;
 746            } else {
 747                self.confirm_contents_will_match_tx
 748                    .send(MatchingEntry {
 749                        should_scan_tx: should_scan_tx,
 750                        worktree_root: snapshot.abs_path().clone(),
 751                        path: ProjectPath {
 752                            worktree_id: snapshot.id(),
 753                            path: entry.path.clone(),
 754                        },
 755                    })
 756                    .await?;
 757            }
 758
 759            anyhow::Ok(())
 760        })
 761        .await;
 762    }
 763}
 764
 765struct InputPath {
 766    entry: Entry,
 767    snapshot: Snapshot,
 768    should_scan_tx: oneshot::Sender<ProjectPath>,
 769}
 770
 771struct MatchingEntry {
 772    worktree_root: Arc<Path>,
 773    path: ProjectPath,
 774    should_scan_tx: oneshot::Sender<ProjectPath>,
 775}
 776
 777/// This struct encapsulates the logic to decide whether a given gitignored directory should be
 778/// scanned based on include/exclude patterns of a search query (as include/exclude parameters may match paths inside it).
 779/// It is kind-of doing an inverse of glob. Given a glob pattern like `src/**/` and a parent path like `src`, we need to decide whether the parent
 780/// may contain glob hits.
 781struct PathInclusionMatcher {
 782    included: BTreeSet<PathBuf>,
 783    query: Arc<SearchQuery>,
 784}
 785
 786impl PathInclusionMatcher {
 787    fn new(query: Arc<SearchQuery>) -> Self {
 788        let mut included = BTreeSet::new();
 789        // To do an inverse glob match, we split each glob into it's prefix and the glob part.
 790        // For example, `src/**/*.rs` becomes `src/` and `**/*.rs`. The glob part gets dropped.
 791        // Then, when checking whether a given directory should be scanned, we check whether it is a non-empty substring of any glob prefix.
 792        if query.filters_path() {
 793            included.extend(
 794                query
 795                    .files_to_include()
 796                    .sources()
 797                    .flat_map(|glob| Some(wax::Glob::new(glob).ok()?.partition().0)),
 798            );
 799        }
 800        Self { included, query }
 801    }
 802
 803    fn should_scan_gitignored_dir(
 804        &self,
 805        entry: &Entry,
 806        snapshot: &Snapshot,
 807        worktree_settings: &WorktreeSettings,
 808    ) -> bool {
 809        if !entry.is_ignored || !entry.kind.is_unloaded() {
 810            return false;
 811        }
 812        if !self.query.include_ignored() {
 813            return false;
 814        }
 815        if worktree_settings.is_path_excluded(&entry.path) {
 816            return false;
 817        }
 818        if !self.query.filters_path() {
 819            return true;
 820        }
 821
 822        let as_abs_path = LazyCell::new(move || snapshot.absolutize(&entry.path));
 823        let entry_path = &entry.path;
 824        // 3. Check Exclusions (Pruning)
 825        // If the current path is a child of an excluded path, we stop.
 826        let is_excluded = self.path_is_definitely_excluded(&entry_path, snapshot);
 827
 828        if is_excluded {
 829            return false;
 830        }
 831
 832        // 4. Check Inclusions (Traversal)
 833        if self.included.is_empty() {
 834            return true;
 835        }
 836
 837        // We scan if the current path is a descendant of an include prefix
 838        // OR if the current path is an ancestor of an include prefix (we need to go deeper to find it).
 839        let is_included = self.included.iter().any(|prefix| {
 840            let (prefix_matches_entry, entry_matches_prefix) = if prefix.is_absolute() {
 841                (
 842                    prefix.starts_with(&**as_abs_path),
 843                    as_abs_path.starts_with(prefix),
 844                )
 845            } else {
 846                RelPath::new(prefix, snapshot.path_style()).map_or((false, false), |prefix| {
 847                    (
 848                        prefix.starts_with(entry_path),
 849                        entry_path.starts_with(&prefix),
 850                    )
 851                })
 852            };
 853
 854            // Logic:
 855            // 1. entry_matches_prefix: We are inside the target zone (e.g. glob: src/, current: src/lib/). Keep scanning.
 856            // 2. prefix_matches_entry: We are above the target zone (e.g. glob: src/foo/, current: src/). Keep scanning to reach foo.
 857            prefix_matches_entry || entry_matches_prefix
 858        });
 859
 860        is_included
 861    }
 862    fn path_is_definitely_excluded(&self, path: &RelPath, snapshot: &Snapshot) -> bool {
 863        if !self.query.files_to_exclude().sources().next().is_none() {
 864            let mut path = if self.query.match_full_paths() {
 865                let mut full_path = snapshot.root_name().to_owned();
 866                full_path.push(path);
 867                full_path
 868            } else {
 869                path.to_owned()
 870            };
 871            loop {
 872                if self.query.files_to_exclude().is_match(&path) {
 873                    return true;
 874                } else if !path.pop() {
 875                    return false;
 876                }
 877            }
 878        } else {
 879            false
 880        }
 881    }
 882}
 883
 884#[cfg(test)]
 885mod tests {
 886    use super::*;
 887    use fs::FakeFs;
 888    use serde_json::json;
 889    use settings::Settings;
 890    use util::{
 891        path,
 892        paths::{PathMatcher, PathStyle},
 893        rel_path::RelPath,
 894    };
 895    use worktree::{Entry, EntryKind, WorktreeSettings};
 896
 897    use crate::{
 898        Project, project_search::PathInclusionMatcher, project_tests::init_test,
 899        search::SearchQuery,
 900    };
 901
 902    #[gpui::test]
 903    async fn test_path_inclusion_matcher(cx: &mut gpui::TestAppContext) {
 904        init_test(cx);
 905
 906        let fs = FakeFs::new(cx.background_executor.clone());
 907        fs.insert_tree(
 908            "/root",
 909            json!({
 910                ".gitignore": "src/data/\n",
 911                "src": {
 912                    "data": {
 913                        "main.csv": "field_1,field_2,field_3",
 914                    },
 915                    "lib": {
 916                        "main.txt": "Are you familiar with fields?",
 917                    },
 918                },
 919            }),
 920        )
 921        .await;
 922
 923        let project = Project::test(fs.clone(), [path!("/root").as_ref()], cx).await;
 924        let worktree = project.update(cx, |project, cx| project.worktrees(cx).next().unwrap());
 925        let (worktree_settings, worktree_snapshot) = worktree.update(cx, |worktree, cx| {
 926            let settings_location = worktree.settings_location(cx);
 927            return (
 928                WorktreeSettings::get(Some(settings_location), cx).clone(),
 929                worktree.snapshot(),
 930            );
 931        });
 932
 933        // Manually create a test entry for the gitignored directory since it won't
 934        // be loaded by the worktree
 935        let entry = Entry {
 936            id: ProjectEntryId::from_proto(1),
 937            kind: EntryKind::UnloadedDir,
 938            path: Arc::from(RelPath::unix(Path::new("src/data")).unwrap()),
 939            inode: 0,
 940            mtime: None,
 941            canonical_path: None,
 942            is_ignored: true,
 943            is_hidden: false,
 944            is_always_included: false,
 945            is_external: false,
 946            is_private: false,
 947            size: 0,
 948            char_bag: Default::default(),
 949            is_fifo: false,
 950        };
 951
 952        // 1. Test searching for `field`, including ignored files without any
 953        // inclusion and exclusion filters.
 954        let include_ignored = true;
 955        let files_to_include = PathMatcher::default();
 956        let files_to_exclude = PathMatcher::default();
 957        let match_full_paths = false;
 958        let search_query = SearchQuery::text(
 959            "field",
 960            false,
 961            false,
 962            include_ignored,
 963            files_to_include,
 964            files_to_exclude,
 965            match_full_paths,
 966            None,
 967        )
 968        .unwrap();
 969
 970        let path_matcher = PathInclusionMatcher::new(Arc::new(search_query));
 971        assert!(path_matcher.should_scan_gitignored_dir(
 972            &entry,
 973            &worktree_snapshot,
 974            &worktree_settings
 975        ));
 976
 977        // 2. Test searching for `field`, including ignored files but updating
 978        // `files_to_include` to only include files under `src/lib`.
 979        let include_ignored = true;
 980        let files_to_include = PathMatcher::new(vec!["src/lib"], PathStyle::Posix).unwrap();
 981        let files_to_exclude = PathMatcher::default();
 982        let match_full_paths = false;
 983        let search_query = SearchQuery::text(
 984            "field",
 985            false,
 986            false,
 987            include_ignored,
 988            files_to_include,
 989            files_to_exclude,
 990            match_full_paths,
 991            None,
 992        )
 993        .unwrap();
 994
 995        let path_matcher = PathInclusionMatcher::new(Arc::new(search_query));
 996        assert!(!path_matcher.should_scan_gitignored_dir(
 997            &entry,
 998            &worktree_snapshot,
 999            &worktree_settings
1000        ));
1001    }
1002}