project_search.rs

   1use std::{
   2    cell::LazyCell,
   3    collections::BTreeSet,
   4    io::{BufRead, BufReader},
   5    ops::Range,
   6    path::{Path, PathBuf},
   7    pin::pin,
   8    sync::Arc,
   9    time::Duration,
  10};
  11
  12use anyhow::Context;
  13use collections::HashSet;
  14use fs::Fs;
  15use futures::FutureExt as _;
  16use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
  17use gpui::{App, AppContext, AsyncApp, BackgroundExecutor, Entity, Priority, Task};
  18use language::{Buffer, BufferSnapshot};
  19use parking_lot::Mutex;
  20use postage::oneshot;
  21use rpc::{AnyProtoClient, proto};
  22use smol::channel::{Receiver, Sender, bounded, unbounded};
  23
  24use util::{ResultExt, maybe, paths::compare_rel_paths, rel_path::RelPath};
  25use worktree::{Entry, ProjectEntryId, Snapshot, Worktree, WorktreeSettings};
  26
  27use crate::{
  28    Project, ProjectItem, ProjectPath, RemotelyCreatedModels,
  29    buffer_store::BufferStore,
  30    search::{SearchQuery, SearchResult},
  31    worktree_store::WorktreeStore,
  32};
  33
  34pub struct Search {
  35    buffer_store: Entity<BufferStore>,
  36    worktree_store: Entity<WorktreeStore>,
  37    limit: usize,
  38    kind: SearchKind,
  39}
  40
  41/// Represents search setup, before it is actually kicked off with Search::into_results
  42enum SearchKind {
  43    /// Search for candidates by inspecting file contents on file system, avoiding loading the buffer unless we know that a given file contains a match.
  44    Local {
  45        fs: Arc<dyn Fs>,
  46        worktrees: Vec<Entity<Worktree>>,
  47    },
  48    /// Query remote host for candidates. As of writing, the host runs a local search in "buffers with matches only" mode.
  49    Remote {
  50        client: AnyProtoClient,
  51        remote_id: u64,
  52        models: Arc<Mutex<RemotelyCreatedModels>>,
  53    },
  54    /// Run search against a known set of candidates. Even when working with a remote host, this won't round-trip to host.
  55    OpenBuffersOnly,
  56}
  57
  58/// Represents results of project search and allows one to either obtain match positions OR
  59/// just the handles to buffers that may match the search. Grabbing the handles is cheaper than obtaining full match positions, because in that case we'll look for
  60/// at most one match in each file.
  61#[must_use]
  62pub struct SearchResultsHandle {
  63    results: Receiver<SearchResult>,
  64    matching_buffers: Receiver<Entity<Buffer>>,
  65    trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
  66}
  67
  68pub struct SearchResults<T> {
  69    pub _task_handle: Task<()>,
  70    pub rx: Receiver<T>,
  71}
  72impl SearchResultsHandle {
  73    pub fn results(self, cx: &mut App) -> SearchResults<SearchResult> {
  74        SearchResults {
  75            _task_handle: (self.trigger_search)(cx),
  76            rx: self.results,
  77        }
  78    }
  79    pub fn matching_buffers(self, cx: &mut App) -> SearchResults<Entity<Buffer>> {
  80        SearchResults {
  81            _task_handle: (self.trigger_search)(cx),
  82            rx: self.matching_buffers,
  83        }
  84    }
  85}
  86
  87#[derive(Clone)]
  88enum FindSearchCandidates {
  89    Local {
  90        fs: Arc<dyn Fs>,
  91        /// Start off with all paths in project and filter them based on:
  92        /// - Include filters
  93        /// - Exclude filters
  94        /// - Only open buffers
  95        /// - Scan ignored files
  96        /// Put another way: filter out files that can't match (without looking at file contents)
  97        input_paths_rx: Receiver<InputPath>,
  98        /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
  99        /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
 100        confirm_contents_will_match_tx: Sender<MatchingEntry>,
 101        confirm_contents_will_match_rx: Receiver<MatchingEntry>,
 102    },
 103    Remote,
 104    OpenBuffersOnly,
 105}
 106
 107impl Search {
 108    pub fn local(
 109        fs: Arc<dyn Fs>,
 110        buffer_store: Entity<BufferStore>,
 111        worktree_store: Entity<WorktreeStore>,
 112        limit: usize,
 113        cx: &mut App,
 114    ) -> Self {
 115        let worktrees = worktree_store.read(cx).visible_worktrees(cx).collect();
 116        Self {
 117            kind: SearchKind::Local { fs, worktrees },
 118            buffer_store,
 119            worktree_store,
 120            limit,
 121        }
 122    }
 123
 124    pub(crate) fn remote(
 125        buffer_store: Entity<BufferStore>,
 126        worktree_store: Entity<WorktreeStore>,
 127        limit: usize,
 128        client_state: (AnyProtoClient, u64, Arc<Mutex<RemotelyCreatedModels>>),
 129    ) -> Self {
 130        Self {
 131            kind: SearchKind::Remote {
 132                client: client_state.0,
 133                remote_id: client_state.1,
 134                models: client_state.2,
 135            },
 136            buffer_store,
 137            worktree_store,
 138            limit,
 139        }
 140    }
 141    pub(crate) fn open_buffers_only(
 142        buffer_store: Entity<BufferStore>,
 143        worktree_store: Entity<WorktreeStore>,
 144        limit: usize,
 145    ) -> Self {
 146        Self {
 147            kind: SearchKind::OpenBuffersOnly,
 148            buffer_store,
 149            worktree_store,
 150            limit,
 151        }
 152    }
 153
 154    pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000;
 155    pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
 156    /// Prepares a project search run. The resulting [`SearchResultsHandle`] has to be used to specify whether you're interested in matching buffers
 157    /// or full search results.
 158    pub fn into_handle(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
 159        let mut open_buffers = HashSet::default();
 160        let mut unnamed_buffers = Vec::new();
 161        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
 162        let buffers = self.buffer_store.read(cx);
 163        for handle in buffers.buffers() {
 164            let buffer = handle.read(cx);
 165            if !buffers.is_searchable(&buffer.remote_id()) {
 166                continue;
 167            } else if let Some(entry_id) = buffer.entry_id(cx) {
 168                open_buffers.insert(entry_id);
 169            } else {
 170                self.limit = self.limit.saturating_sub(1);
 171                unnamed_buffers.push(handle)
 172            };
 173        }
 174        let open_buffers = Arc::new(open_buffers);
 175        let executor = cx.background_executor().clone();
 176        let (tx, rx) = unbounded();
 177        let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
 178        let matching_buffers = grab_buffer_snapshot_rx.clone();
 179        let trigger_search = Box::new(move |cx: &mut App| {
 180            cx.spawn(async move |cx| {
 181                for buffer in unnamed_buffers {
 182                    _ = grab_buffer_snapshot_tx.send(buffer).await;
 183                }
 184
 185                let (find_all_matches_tx, find_all_matches_rx) =
 186                    bounded(MAX_CONCURRENT_BUFFER_OPENS);
 187                let query = Arc::new(query);
 188                let (candidate_searcher, tasks) = match self.kind {
 189                    SearchKind::OpenBuffersOnly => {
 190                        let open_buffers = cx.update(|cx| self.all_loaded_buffers(&query, cx));
 191                        let fill_requests = cx
 192                            .background_spawn(async move {
 193                                for buffer in open_buffers {
 194                                    if let Err(_) = grab_buffer_snapshot_tx.send(buffer).await {
 195                                        return;
 196                                    }
 197                                }
 198                            })
 199                            .boxed_local();
 200                        (FindSearchCandidates::OpenBuffersOnly, vec![fill_requests])
 201                    }
 202                    SearchKind::Local {
 203                        fs,
 204                        ref mut worktrees,
 205                    } => {
 206                        let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
 207                            unbounded();
 208                        let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
 209                            bounded(64);
 210                        let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
 211
 212                        let (input_paths_tx, input_paths_rx) = unbounded();
 213                        let tasks = vec![
 214                            cx.spawn(Self::provide_search_paths(
 215                                std::mem::take(worktrees),
 216                                query.clone(),
 217                                input_paths_tx,
 218                                sorted_search_results_tx,
 219                            ))
 220                            .boxed_local(),
 221                            Self::open_buffers(
 222                                self.buffer_store,
 223                                get_buffer_for_full_scan_rx,
 224                                grab_buffer_snapshot_tx,
 225                                cx.clone(),
 226                            )
 227                            .boxed_local(),
 228                            cx.background_spawn(Self::maintain_sorted_search_results(
 229                                sorted_search_results_rx,
 230                                get_buffer_for_full_scan_tx,
 231                                self.limit,
 232                            ))
 233                            .boxed_local(),
 234                        ];
 235                        (
 236                            FindSearchCandidates::Local {
 237                                fs,
 238                                confirm_contents_will_match_tx,
 239                                confirm_contents_will_match_rx,
 240                                input_paths_rx,
 241                            },
 242                            tasks,
 243                        )
 244                    }
 245                    SearchKind::Remote {
 246                        client,
 247                        remote_id,
 248                        models,
 249                    } => {
 250                        let (handle, rx) = self
 251                            .buffer_store
 252                            .update(cx, |this, _| this.register_project_search_result_handle());
 253
 254                        let cancel_ongoing_search = util::defer({
 255                            let client = client.clone();
 256                            move || {
 257                                _ = client.send(proto::FindSearchCandidatesCancelled {
 258                                    project_id: remote_id,
 259                                    handle,
 260                                });
 261                            }
 262                        });
 263                        let request = client.request(proto::FindSearchCandidates {
 264                            project_id: remote_id,
 265                            query: Some(query.to_proto()),
 266                            limit: self.limit as _,
 267                            handle,
 268                        });
 269
 270                        let buffer_store = self.buffer_store;
 271                        let guard = cx.update(|cx| {
 272                            Project::retain_remotely_created_models_impl(
 273                                &models,
 274                                &buffer_store,
 275                                &self.worktree_store,
 276                                cx,
 277                            )
 278                        });
 279
 280                        let issue_remote_buffers_request = cx
 281                            .spawn(async move |cx| {
 282                                let _ = maybe!(async move {
 283                                    request.await?;
 284
 285                                    let (buffer_tx, buffer_rx) = bounded(24);
 286
 287                                    let wait_for_remote_buffers = cx.spawn(async move |cx| {
 288                                        while let Ok(buffer_id) = rx.recv().await {
 289                                            let buffer =
 290                                                buffer_store.update(cx, |buffer_store, cx| {
 291                                                    buffer_store
 292                                                        .wait_for_remote_buffer(buffer_id, cx)
 293                                                });
 294                                            buffer_tx.send(buffer).await?;
 295                                        }
 296                                        anyhow::Ok(())
 297                                    });
 298
 299                                    let forward_buffers = cx.background_spawn(async move {
 300                                        while let Ok(buffer) = buffer_rx.recv().await {
 301                                            let _ =
 302                                                grab_buffer_snapshot_tx.send(buffer.await?).await;
 303                                        }
 304                                        anyhow::Ok(())
 305                                    });
 306                                    let (left, right) = futures::future::join(
 307                                        wait_for_remote_buffers,
 308                                        forward_buffers,
 309                                    )
 310                                    .await;
 311                                    left?;
 312                                    right?;
 313
 314                                    drop(guard);
 315                                    cancel_ongoing_search.abort();
 316                                    anyhow::Ok(())
 317                                })
 318                                .await
 319                                .log_err();
 320                            })
 321                            .boxed_local();
 322                        (
 323                            FindSearchCandidates::Remote,
 324                            vec![issue_remote_buffers_request],
 325                        )
 326                    }
 327                };
 328
 329                let should_find_all_matches = !tx.is_closed();
 330
 331                let _executor = executor.clone();
 332                let worker_pool = executor.spawn(async move {
 333                    let num_cpus = _executor.num_cpus();
 334
 335                    assert!(num_cpus > 0);
 336                    _executor
 337                        .scoped(|scope| {
 338                            for _ in 0..num_cpus - 1 {
 339                                let worker = Worker {
 340                                    query: query.clone(),
 341                                    open_buffers: open_buffers.clone(),
 342                                    candidates: candidate_searcher.clone(),
 343                                    find_all_matches_rx: find_all_matches_rx.clone(),
 344                                };
 345                                scope.spawn(worker.run());
 346                            }
 347
 348                            drop(find_all_matches_rx);
 349                            drop(candidate_searcher);
 350                        })
 351                        .await;
 352                });
 353
 354                let (sorted_matches_tx, sorted_matches_rx) = unbounded();
 355                // The caller of `into_handle` decides whether they're interested in all matches (files that matched + all matching ranges) or
 356                // just the files. *They are using the same stream as the guts of the project search do*.
 357                // This means that we cannot grab values off of that stream unless it's strictly needed for making a progress in project search.
 358                //
 359                // Grabbing buffer snapshots is only necessary when we're looking for all matches. If the caller decided that they're not interested
 360                // in all matches, running that task unconditionally would hinder caller's ability to observe all matching file paths.
 361                let buffer_snapshots = if should_find_all_matches {
 362                    Some(
 363                        Self::grab_buffer_snapshots(
 364                            grab_buffer_snapshot_rx,
 365                            find_all_matches_tx,
 366                            sorted_matches_tx,
 367                            cx.clone(),
 368                        )
 369                        .boxed_local(),
 370                    )
 371                } else {
 372                    drop(find_all_matches_tx);
 373
 374                    None
 375                };
 376                let ensure_matches_are_reported_in_order = if should_find_all_matches {
 377                    Some(
 378                        Self::ensure_matched_ranges_are_reported_in_order(sorted_matches_rx, tx)
 379                            .boxed_local(),
 380                    )
 381                } else {
 382                    drop(tx);
 383                    None
 384                };
 385
 386                futures::future::join_all(
 387                    [worker_pool.boxed_local()]
 388                        .into_iter()
 389                        .chain(buffer_snapshots)
 390                        .chain(ensure_matches_are_reported_in_order)
 391                        .chain(tasks),
 392                )
 393                .await;
 394            })
 395        });
 396
 397        SearchResultsHandle {
 398            results: rx,
 399            matching_buffers,
 400            trigger_search,
 401        }
 402    }
 403
 404    fn provide_search_paths(
 405        worktrees: Vec<Entity<Worktree>>,
 406        query: Arc<SearchQuery>,
 407        tx: Sender<InputPath>,
 408        results: Sender<oneshot::Receiver<ProjectPath>>,
 409    ) -> impl AsyncFnOnce(&mut AsyncApp) {
 410        async move |cx| {
 411            _ = maybe!(async move {
 412                let gitignored_tracker = PathInclusionMatcher::new(query.clone());
 413                let include_ignored = query.include_ignored();
 414                for worktree in worktrees {
 415                    let (mut snapshot, worktree_settings) = worktree
 416                        .read_with(cx, |this, _| {
 417                            Some((this.snapshot(), this.as_local()?.settings()))
 418                        })
 419                        .context("The worktree is not local")?;
 420                    if query.include_ignored() {
 421                        // Pre-fetch all of the ignored directories as they're going to be searched.
 422                        let mut entries_to_refresh = vec![];
 423
 424                        for entry in snapshot.entries(query.include_ignored(), 0) {
 425                            if gitignored_tracker.should_scan_gitignored_dir(
 426                                entry,
 427                                &snapshot,
 428                                &worktree_settings,
 429                            ) {
 430                                entries_to_refresh.push(entry.path.clone());
 431                            }
 432                        }
 433                        let barrier = worktree.update(cx, |this, _| {
 434                            let local = this.as_local_mut()?;
 435                            let barrier = entries_to_refresh
 436                                .into_iter()
 437                                .map(|path| local.add_path_prefix_to_scan(path).into_future())
 438                                .collect::<Vec<_>>();
 439                            Some(barrier)
 440                        });
 441                        if let Some(barriers) = barrier {
 442                            futures::future::join_all(barriers).await;
 443                        }
 444                        snapshot = worktree.read_with(cx, |this, _| this.snapshot());
 445                    }
 446                    let tx = tx.clone();
 447                    let results = results.clone();
 448
 449                    cx.background_executor()
 450                        .spawn(async move {
 451                            for entry in snapshot.files(include_ignored, 0) {
 452                                let (should_scan_tx, should_scan_rx) = oneshot::channel();
 453
 454                                let Ok(_) = tx
 455                                    .send(InputPath {
 456                                        entry: entry.clone(),
 457                                        snapshot: snapshot.clone(),
 458                                        should_scan_tx,
 459                                    })
 460                                    .await
 461                                else {
 462                                    return;
 463                                };
 464                                if results.send(should_scan_rx).await.is_err() {
 465                                    return;
 466                                };
 467                            }
 468                        })
 469                        .await;
 470                }
 471                anyhow::Ok(())
 472            })
 473            .await;
 474        }
 475    }
 476
 477    async fn maintain_sorted_search_results(
 478        rx: Receiver<oneshot::Receiver<ProjectPath>>,
 479        paths_for_full_scan: Sender<ProjectPath>,
 480        limit: usize,
 481    ) {
 482        let mut rx = pin!(rx);
 483        let mut matched = 0;
 484        while let Some(mut next_path_result) = rx.next().await {
 485            let Some(successful_path) = next_path_result.next().await else {
 486                // This file did not produce a match, hence skip it.
 487                continue;
 488            };
 489            if paths_for_full_scan.send(successful_path).await.is_err() {
 490                return;
 491            };
 492            matched += 1;
 493            if matched >= limit {
 494                break;
 495            }
 496        }
 497    }
 498
 499    /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
 500    async fn open_buffers(
 501        buffer_store: Entity<BufferStore>,
 502        rx: Receiver<ProjectPath>,
 503        find_all_matches_tx: Sender<Entity<Buffer>>,
 504        mut cx: AsyncApp,
 505    ) {
 506        let mut rx = pin!(rx.ready_chunks(64));
 507        _ = maybe!(async move {
 508            while let Some(requested_paths) = rx.next().await {
 509                let mut buffers = buffer_store.update(&mut cx, |this, cx| {
 510                    requested_paths
 511                        .into_iter()
 512                        .map(|path| this.open_buffer(path, cx))
 513                        .collect::<FuturesOrdered<_>>()
 514                });
 515
 516                while let Some(buffer) = buffers.next().await {
 517                    if let Some(buffer) = buffer.log_err() {
 518                        find_all_matches_tx.send(buffer).await?;
 519                    }
 520                }
 521            }
 522            Result::<_, anyhow::Error>::Ok(())
 523        })
 524        .await;
 525    }
 526
 527    async fn grab_buffer_snapshots(
 528        rx: Receiver<Entity<Buffer>>,
 529        find_all_matches_tx: Sender<(
 530            Entity<Buffer>,
 531            BufferSnapshot,
 532            oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
 533        )>,
 534        results: Sender<oneshot::Receiver<(Entity<Buffer>, Vec<Range<language::Anchor>>)>>,
 535        mut cx: AsyncApp,
 536    ) {
 537        _ = maybe!(async move {
 538            while let Ok(buffer) = rx.recv().await {
 539                let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot());
 540                let (tx, rx) = oneshot::channel();
 541                find_all_matches_tx.send((buffer, snapshot, tx)).await?;
 542                results.send(rx).await?;
 543            }
 544            debug_assert!(rx.is_empty());
 545            Result::<_, anyhow::Error>::Ok(())
 546        })
 547        .await;
 548    }
 549
 550    async fn ensure_matched_ranges_are_reported_in_order(
 551        rx: Receiver<oneshot::Receiver<(Entity<Buffer>, Vec<Range<language::Anchor>>)>>,
 552        tx: Sender<SearchResult>,
 553    ) {
 554        use postage::stream::Stream;
 555        _ = maybe!(async move {
 556            let mut matched_buffers = 0;
 557            let mut matches = 0;
 558            while let Ok(mut next_buffer_matches) = rx.recv().await {
 559                let Some((buffer, ranges)) = next_buffer_matches.recv().await else {
 560                    continue;
 561                };
 562
 563                if matched_buffers > Search::MAX_SEARCH_RESULT_FILES
 564                    || matches > Search::MAX_SEARCH_RESULT_RANGES
 565                {
 566                    _ = tx.send(SearchResult::LimitReached).await;
 567                    break;
 568                }
 569                matched_buffers += 1;
 570                matches += ranges.len();
 571
 572                _ = tx.send(SearchResult::Buffer { buffer, ranges }).await?;
 573            }
 574            anyhow::Ok(())
 575        })
 576        .await;
 577    }
 578
 579    fn all_loaded_buffers(&self, search_query: &SearchQuery, cx: &App) -> Vec<Entity<Buffer>> {
 580        let worktree_store = self.worktree_store.read(cx);
 581        let mut buffers = search_query
 582            .buffers()
 583            .into_iter()
 584            .flatten()
 585            .filter(|buffer| {
 586                let b = buffer.read(cx);
 587                if let Some(file) = b.file() {
 588                    if !search_query.match_path(file.path()) {
 589                        return false;
 590                    }
 591                    if !search_query.include_ignored()
 592                        && let Some(entry) = b
 593                            .entry_id(cx)
 594                            .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx))
 595                        && entry.is_ignored
 596                    {
 597                        return false;
 598                    }
 599                }
 600                true
 601            })
 602            .cloned()
 603            .collect::<Vec<_>>();
 604        buffers.sort_by(|a, b| {
 605            let a = a.read(cx);
 606            let b = b.read(cx);
 607            match (a.file(), b.file()) {
 608                (None, None) => a.remote_id().cmp(&b.remote_id()),
 609                (None, Some(_)) => std::cmp::Ordering::Less,
 610                (Some(_), None) => std::cmp::Ordering::Greater,
 611                (Some(a), Some(b)) => compare_rel_paths((a.path(), true), (b.path(), true)),
 612            }
 613        });
 614
 615        buffers
 616    }
 617}
 618
 619struct Worker {
 620    query: Arc<SearchQuery>,
 621    open_buffers: Arc<HashSet<ProjectEntryId>>,
 622    candidates: FindSearchCandidates,
 623    /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
 624    /// Then, when you're done, share them via the channel you were given.
 625    find_all_matches_rx: Receiver<(
 626        Entity<Buffer>,
 627        BufferSnapshot,
 628        oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
 629    )>,
 630}
 631
 632impl Worker {
 633    async fn run(self) {
 634        let (
 635            input_paths_rx,
 636            confirm_contents_will_match_rx,
 637            mut confirm_contents_will_match_tx,
 638            fs,
 639        ) = match self.candidates {
 640            FindSearchCandidates::Local {
 641                fs,
 642                input_paths_rx,
 643                confirm_contents_will_match_rx,
 644                confirm_contents_will_match_tx,
 645            } => (
 646                input_paths_rx,
 647                confirm_contents_will_match_rx,
 648                confirm_contents_will_match_tx,
 649                Some(fs),
 650            ),
 651            FindSearchCandidates::Remote | FindSearchCandidates::OpenBuffersOnly => {
 652                (unbounded().1, unbounded().1, unbounded().0, None)
 653            }
 654        };
 655        // WorkerA: grabs a request for "find all matches in file/a" <- takes 5 minutes
 656        // right after: WorkerB: grabs a request for "find all matches in file/b" <- takes 5 seconds
 657        let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
 658        let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse());
 659        let mut scan_path = pin!(input_paths_rx.fuse());
 660
 661        loop {
 662            let handler = RequestHandler {
 663                query: &self.query,
 664                open_entries: &self.open_buffers,
 665                fs: fs.as_deref(),
 666                confirm_contents_will_match_tx: &confirm_contents_will_match_tx,
 667            };
 668            // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
 669            // steps straight away. Another worker might be about to produce a value that will
 670            // be pushed there, thus we'll replace current worker's pipe with a dummy one.
 671            // That way, we'll only ever close a next-stage channel when ALL workers do so.
 672            select_biased! {
 673                find_all_matches = find_all_matches.next() => {
 674                    let Some(matches) = find_all_matches else {
 675                        continue;
 676                    };
 677                    handler.handle_find_all_matches(matches).await;
 678                },
 679                find_first_match = find_first_match.next() => {
 680                    if let Some(buffer_with_at_least_one_match) = find_first_match {
 681                        handler.handle_find_first_match(buffer_with_at_least_one_match).await;
 682                    }
 683                },
 684                scan_path = scan_path.next() => {
 685                    if let Some(path_to_scan) = scan_path {
 686                        handler.handle_scan_path(path_to_scan).await;
 687                    } else {
 688                        // If we're the last worker to notice that this is not producing values, close the upstream.
 689                        confirm_contents_will_match_tx = bounded(1).0;
 690                    }
 691
 692                 }
 693                 complete => {
 694                     break
 695                },
 696
 697            }
 698        }
 699    }
 700}
 701
 702struct RequestHandler<'worker> {
 703    query: &'worker SearchQuery,
 704    fs: Option<&'worker dyn Fs>,
 705    open_entries: &'worker HashSet<ProjectEntryId>,
 706    confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
 707}
 708
 709impl RequestHandler<'_> {
 710    async fn handle_find_all_matches(
 711        &self,
 712        (buffer, snapshot, mut report_matches): (
 713            Entity<Buffer>,
 714            BufferSnapshot,
 715            oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
 716        ),
 717    ) {
 718        let ranges = self
 719            .query
 720            .search(&snapshot, None)
 721            .await
 722            .iter()
 723            .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
 724            .collect::<Vec<_>>();
 725
 726        _ = report_matches.send((buffer, ranges)).await;
 727    }
 728
 729    async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
 730        _=maybe!(async move {
 731            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
 732            let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else {
 733                return anyhow::Ok(());
 734            };
 735
 736            let mut file = BufReader::new(file);
 737            let file_start = file.fill_buf()?;
 738
 739            if let Err(Some(starting_position)) =
 740            std::str::from_utf8(file_start).map_err(|e| e.error_len())
 741            {
 742                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
 743                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
 744                log::debug!(
 745                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
 746                );
 747                return Ok(());
 748            }
 749
 750            if self.query.detect(file).await.unwrap_or(false) {
 751                // Yes, we should scan the whole file.
 752                entry.should_scan_tx.send(entry.path).await?;
 753            }
 754            Ok(())
 755        }).await;
 756    }
 757
 758    async fn handle_scan_path(&self, req: InputPath) {
 759        _ = maybe!(async move {
 760            let InputPath {
 761                entry,
 762                snapshot,
 763                mut should_scan_tx,
 764            } = req;
 765
 766            if entry.is_fifo || !entry.is_file() {
 767                return Ok(());
 768            }
 769
 770            if self.query.filters_path() {
 771                let matched_path = if self.query.match_full_paths() {
 772                    let mut full_path = snapshot.root_name().to_owned();
 773                    full_path.push(&entry.path);
 774                    self.query.match_path(&full_path)
 775                } else {
 776                    self.query.match_path(&entry.path)
 777                };
 778                if !matched_path {
 779                    return Ok(());
 780                }
 781            }
 782
 783            if self.open_entries.contains(&entry.id) {
 784                // The buffer is already in memory and that's the version we want to scan;
 785                // hence skip the dilly-dally and look for all matches straight away.
 786                should_scan_tx
 787                    .send(ProjectPath {
 788                        worktree_id: snapshot.id(),
 789                        path: entry.path.clone(),
 790                    })
 791                    .await?;
 792            } else {
 793                self.confirm_contents_will_match_tx
 794                    .send(MatchingEntry {
 795                        should_scan_tx: should_scan_tx,
 796                        worktree_root: snapshot.abs_path().clone(),
 797                        path: ProjectPath {
 798                            worktree_id: snapshot.id(),
 799                            path: entry.path.clone(),
 800                        },
 801                    })
 802                    .await?;
 803            }
 804
 805            anyhow::Ok(())
 806        })
 807        .await;
 808    }
 809}
 810
 811struct InputPath {
 812    entry: Entry,
 813    snapshot: Snapshot,
 814    should_scan_tx: oneshot::Sender<ProjectPath>,
 815}
 816
 817struct MatchingEntry {
 818    worktree_root: Arc<Path>,
 819    path: ProjectPath,
 820    should_scan_tx: oneshot::Sender<ProjectPath>,
 821}
 822
 823/// This struct encapsulates the logic to decide whether a given gitignored directory should be
 824/// scanned based on include/exclude patterns of a search query (as include/exclude parameters may match paths inside it).
 825/// It is kind-of doing an inverse of glob. Given a glob pattern like `src/**/` and a parent path like `src`, we need to decide whether the parent
 826/// may contain glob hits.
 827struct PathInclusionMatcher {
 828    included: BTreeSet<PathBuf>,
 829    query: Arc<SearchQuery>,
 830}
 831
 832impl PathInclusionMatcher {
 833    fn new(query: Arc<SearchQuery>) -> Self {
 834        let mut included = BTreeSet::new();
 835        // To do an inverse glob match, we split each glob into it's prefix and the glob part.
 836        // For example, `src/**/*.rs` becomes `src/` and `**/*.rs`. The glob part gets dropped.
 837        // Then, when checking whether a given directory should be scanned, we check whether it is a non-empty substring of any glob prefix.
 838        if query.filters_path() {
 839            included.extend(
 840                query
 841                    .files_to_include()
 842                    .sources()
 843                    .flat_map(|glob| Some(wax::Glob::new(glob).ok()?.partition().0)),
 844            );
 845        }
 846        Self { included, query }
 847    }
 848
 849    fn should_scan_gitignored_dir(
 850        &self,
 851        entry: &Entry,
 852        snapshot: &Snapshot,
 853        worktree_settings: &WorktreeSettings,
 854    ) -> bool {
 855        if !entry.is_ignored || !entry.kind.is_unloaded() {
 856            return false;
 857        }
 858        if !self.query.include_ignored() {
 859            return false;
 860        }
 861        if worktree_settings.is_path_excluded(&entry.path) {
 862            return false;
 863        }
 864        if !self.query.filters_path() {
 865            return true;
 866        }
 867
 868        let as_abs_path = LazyCell::new(move || snapshot.absolutize(&entry.path));
 869        let entry_path = &entry.path;
 870        // 3. Check Exclusions (Pruning)
 871        // If the current path is a child of an excluded path, we stop.
 872        let is_excluded = self.path_is_definitely_excluded(&entry_path, snapshot);
 873
 874        if is_excluded {
 875            return false;
 876        }
 877
 878        // 4. Check Inclusions (Traversal)
 879        if self.included.is_empty() {
 880            return true;
 881        }
 882
 883        // We scan if the current path is a descendant of an include prefix
 884        // OR if the current path is an ancestor of an include prefix (we need to go deeper to find it).
 885        let is_included = self.included.iter().any(|prefix| {
 886            let (prefix_matches_entry, entry_matches_prefix) = if prefix.is_absolute() {
 887                (
 888                    prefix.starts_with(&**as_abs_path),
 889                    as_abs_path.starts_with(prefix),
 890                )
 891            } else {
 892                RelPath::new(prefix, snapshot.path_style()).map_or((false, false), |prefix| {
 893                    (
 894                        prefix.starts_with(entry_path),
 895                        entry_path.starts_with(&prefix),
 896                    )
 897                })
 898            };
 899
 900            // Logic:
 901            // 1. entry_matches_prefix: We are inside the target zone (e.g. glob: src/, current: src/lib/). Keep scanning.
 902            // 2. prefix_matches_entry: We are above the target zone (e.g. glob: src/foo/, current: src/). Keep scanning to reach foo.
 903            prefix_matches_entry || entry_matches_prefix
 904        });
 905
 906        is_included
 907    }
 908    fn path_is_definitely_excluded(&self, path: &RelPath, snapshot: &Snapshot) -> bool {
 909        if !self.query.files_to_exclude().sources().next().is_none() {
 910            let mut path = if self.query.match_full_paths() {
 911                let mut full_path = snapshot.root_name().to_owned();
 912                full_path.push(path);
 913                full_path
 914            } else {
 915                path.to_owned()
 916            };
 917            loop {
 918                if self.query.files_to_exclude().is_match(&path) {
 919                    return true;
 920                } else if !path.pop() {
 921                    return false;
 922                }
 923            }
 924        } else {
 925            false
 926        }
 927    }
 928}
 929
 930type IsTerminating = bool;
 931/// Adaptive batcher that starts eager (small batches) and grows batch size
 932/// when items arrive quickly, reducing RPC overhead while preserving low latency
 933/// for slow streams.
 934pub struct AdaptiveBatcher<T> {
 935    items: Sender<T>,
 936    flush_batch: Sender<IsTerminating>,
 937    _batch_task: Task<()>,
 938}
 939
 940impl<T: 'static + Send> AdaptiveBatcher<T> {
 941    pub fn new(cx: &BackgroundExecutor) -> (Self, Receiver<Vec<T>>) {
 942        let (items, rx) = unbounded();
 943        let (batch_tx, batch_rx) = unbounded();
 944        let (flush_batch_tx, flush_batch_rx) = unbounded();
 945        let flush_batch = flush_batch_tx.clone();
 946        let executor = cx.clone();
 947        let _batch_task = cx.spawn_with_priority(gpui::Priority::High, async move {
 948            let mut current_batch = vec![];
 949            let mut items_produced_so_far = 0_u64;
 950
 951            let mut _schedule_flush_after_delay: Option<Task<()>> = None;
 952            let _time_elapsed_since_start_of_search = std::time::Instant::now();
 953            let mut flush = pin!(flush_batch_rx);
 954            let mut terminating = false;
 955            loop {
 956                select_biased! {
 957                    item = rx.recv().fuse() => {
 958                        match item {
 959                            Ok(new_item) => {
 960                                let is_fresh_batch = current_batch.is_empty();
 961                                items_produced_so_far += 1;
 962                                current_batch.push(new_item);
 963                                if is_fresh_batch {
 964                                    // Chosen arbitrarily based on some experimentation with plots.
 965                                    let desired_duration_ms = (20 * (items_produced_so_far + 2).ilog2() as u64).min(300);
 966                                    let desired_duration = Duration::from_millis(desired_duration_ms);
 967                                    let _executor = executor.clone();
 968                                    let _flush = flush_batch_tx.clone();
 969                                    let new_timer = executor.spawn_with_priority(Priority::High, async move {
 970                                        _executor.timer(desired_duration).await;
 971                                        _ = _flush.send(false).await;
 972                                    });
 973                                    _schedule_flush_after_delay = Some(new_timer);
 974                                }
 975                            }
 976                            Err(_) => {
 977                                // Items channel closed - send any remaining batch before exiting
 978                                if !current_batch.is_empty() {
 979                                    _ = batch_tx.send(std::mem::take(&mut current_batch)).await;
 980                                }
 981                                break;
 982                            }
 983                        }
 984                    }
 985                    should_break_afterwards = flush.next() => {
 986                        if !current_batch.is_empty() {
 987                            _ = batch_tx.send(std::mem::take(&mut current_batch)).await;
 988                            _schedule_flush_after_delay = None;
 989                        }
 990                        if should_break_afterwards.unwrap_or_default() {
 991                            terminating = true;
 992                        }
 993                    }
 994                    complete => {
 995                        break;
 996                    }
 997                }
 998                if terminating {
 999                    // Drain any remaining items before exiting
1000                    while let Ok(new_item) = rx.try_recv() {
1001                        current_batch.push(new_item);
1002                    }
1003                    if !current_batch.is_empty() {
1004                        _ = batch_tx.send(std::mem::take(&mut current_batch)).await;
1005                    }
1006                    break;
1007                }
1008            }
1009        });
1010        let this = Self {
1011            items,
1012            _batch_task,
1013            flush_batch,
1014        };
1015        (this, batch_rx)
1016    }
1017
1018    pub async fn push(&self, item: T) {
1019        _ = self.items.send(item).await;
1020    }
1021
1022    pub async fn flush(self) {
1023        _ = self.flush_batch.send(true).await;
1024        self._batch_task.await;
1025    }
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030    use super::*;
1031    use fs::FakeFs;
1032    use serde_json::json;
1033    use settings::Settings;
1034    use util::{
1035        path,
1036        paths::{PathMatcher, PathStyle},
1037        rel_path::RelPath,
1038    };
1039    use worktree::{Entry, EntryKind, WorktreeSettings};
1040
1041    use crate::{
1042        Project, project_search::PathInclusionMatcher, project_tests::init_test,
1043        search::SearchQuery,
1044    };
1045
1046    #[gpui::test]
1047    async fn test_path_inclusion_matcher(cx: &mut gpui::TestAppContext) {
1048        init_test(cx);
1049
1050        let fs = FakeFs::new(cx.background_executor.clone());
1051        fs.insert_tree(
1052            "/root",
1053            json!({
1054                ".gitignore": "src/data/\n",
1055                "src": {
1056                    "data": {
1057                        "main.csv": "field_1,field_2,field_3",
1058                    },
1059                    "lib": {
1060                        "main.txt": "Are you familiar with fields?",
1061                    },
1062                },
1063            }),
1064        )
1065        .await;
1066
1067        let project = Project::test(fs.clone(), [path!("/root").as_ref()], cx).await;
1068        let worktree = project.update(cx, |project, cx| project.worktrees(cx).next().unwrap());
1069        let (worktree_settings, worktree_snapshot) = worktree.update(cx, |worktree, cx| {
1070            let settings_location = worktree.settings_location(cx);
1071            return (
1072                WorktreeSettings::get(Some(settings_location), cx).clone(),
1073                worktree.snapshot(),
1074            );
1075        });
1076
1077        // Manually create a test entry for the gitignored directory since it won't
1078        // be loaded by the worktree
1079        let entry = Entry {
1080            id: ProjectEntryId::from_proto(1),
1081            kind: EntryKind::UnloadedDir,
1082            path: Arc::from(RelPath::unix(Path::new("src/data")).unwrap()),
1083            inode: 0,
1084            mtime: None,
1085            canonical_path: None,
1086            is_ignored: true,
1087            is_hidden: false,
1088            is_always_included: false,
1089            is_external: false,
1090            is_private: false,
1091            size: 0,
1092            char_bag: Default::default(),
1093            is_fifo: false,
1094        };
1095
1096        // 1. Test searching for `field`, including ignored files without any
1097        // inclusion and exclusion filters.
1098        let include_ignored = true;
1099        let files_to_include = PathMatcher::default();
1100        let files_to_exclude = PathMatcher::default();
1101        let match_full_paths = false;
1102        let search_query = SearchQuery::text(
1103            "field",
1104            false,
1105            false,
1106            include_ignored,
1107            files_to_include,
1108            files_to_exclude,
1109            match_full_paths,
1110            None,
1111        )
1112        .unwrap();
1113
1114        let path_matcher = PathInclusionMatcher::new(Arc::new(search_query));
1115        assert!(path_matcher.should_scan_gitignored_dir(
1116            &entry,
1117            &worktree_snapshot,
1118            &worktree_settings
1119        ));
1120
1121        // 2. Test searching for `field`, including ignored files but updating
1122        // `files_to_include` to only include files under `src/lib`.
1123        let include_ignored = true;
1124        let files_to_include = PathMatcher::new(vec!["src/lib"], PathStyle::Posix).unwrap();
1125        let files_to_exclude = PathMatcher::default();
1126        let match_full_paths = false;
1127        let search_query = SearchQuery::text(
1128            "field",
1129            false,
1130            false,
1131            include_ignored,
1132            files_to_include,
1133            files_to_exclude,
1134            match_full_paths,
1135            None,
1136        )
1137        .unwrap();
1138
1139        let path_matcher = PathInclusionMatcher::new(Arc::new(search_query));
1140        assert!(!path_matcher.should_scan_gitignored_dir(
1141            &entry,
1142            &worktree_snapshot,
1143            &worktree_settings
1144        ));
1145    }
1146}