project_search.rs

   1use std::{
   2    cell::LazyCell,
   3    collections::BTreeSet,
   4    io::{BufRead, BufReader},
   5    ops::Range,
   6    path::{Path, PathBuf},
   7    pin::pin,
   8    sync::Arc,
   9    time::Duration,
  10};
  11
  12use anyhow::Context;
  13use collections::HashSet;
  14use fs::Fs;
  15use futures::FutureExt as _;
  16use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
  17use gpui::{App, AppContext, AsyncApp, BackgroundExecutor, Entity, Priority, Task};
  18use language::{Buffer, BufferSnapshot};
  19use parking_lot::Mutex;
  20use postage::oneshot;
  21use rpc::{AnyProtoClient, proto};
  22use smol::channel::{Receiver, Sender, bounded, unbounded};
  23
  24use util::{ResultExt, maybe, paths::compare_rel_paths, rel_path::RelPath};
  25use worktree::{Entry, ProjectEntryId, Snapshot, Worktree, WorktreeSettings};
  26
  27use crate::{
  28    Project, ProjectItem, ProjectPath, RemotelyCreatedModels,
  29    buffer_store::BufferStore,
  30    search::{SearchQuery, SearchResult},
  31    worktree_store::WorktreeStore,
  32};
  33
  34pub struct Search {
  35    buffer_store: Entity<BufferStore>,
  36    worktree_store: Entity<WorktreeStore>,
  37    limit: usize,
  38    kind: SearchKind,
  39}
  40
  41/// Represents search setup, before it is actually kicked off with Search::into_results
  42enum SearchKind {
  43    /// Search for candidates by inspecting file contents on file system, avoiding loading the buffer unless we know that a given file contains a match.
  44    Local {
  45        fs: Arc<dyn Fs>,
  46        worktrees: Vec<Entity<Worktree>>,
  47    },
  48    /// Query remote host for candidates. As of writing, the host runs a local search in "buffers with matches only" mode.
  49    Remote {
  50        client: AnyProtoClient,
  51        remote_id: u64,
  52        models: Arc<Mutex<RemotelyCreatedModels>>,
  53    },
  54    /// Run search against a known set of candidates. Even when working with a remote host, this won't round-trip to host.
  55    OpenBuffersOnly,
  56}
  57
  58/// Represents results of project search and allows one to either obtain match positions OR
  59/// just the handles to buffers that may match the search. Grabbing the handles is cheaper than obtaining full match positions, because in that case we'll look for
  60/// at most one match in each file.
  61#[must_use]
  62pub struct SearchResultsHandle {
  63    results: Receiver<SearchResult>,
  64    matching_buffers: Receiver<Entity<Buffer>>,
  65    trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
  66}
  67
  68pub struct SearchResults<T> {
  69    pub _task_handle: Task<()>,
  70    pub rx: Receiver<T>,
  71}
  72impl SearchResultsHandle {
  73    pub fn results(self, cx: &mut App) -> SearchResults<SearchResult> {
  74        SearchResults {
  75            _task_handle: (self.trigger_search)(cx),
  76            rx: self.results,
  77        }
  78    }
  79    pub fn matching_buffers(self, cx: &mut App) -> SearchResults<Entity<Buffer>> {
  80        SearchResults {
  81            _task_handle: (self.trigger_search)(cx),
  82            rx: self.matching_buffers,
  83        }
  84    }
  85}
  86
  87#[derive(Clone)]
  88enum FindSearchCandidates {
  89    Local {
  90        fs: Arc<dyn Fs>,
  91        /// Start off with all paths in project and filter them based on:
  92        /// - Include filters
  93        /// - Exclude filters
  94        /// - Only open buffers
  95        /// - Scan ignored files
  96        /// Put another way: filter out files that can't match (without looking at file contents)
  97        input_paths_rx: Receiver<InputPath>,
  98        /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
  99        /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
 100        confirm_contents_will_match_tx: Sender<MatchingEntry>,
 101        confirm_contents_will_match_rx: Receiver<MatchingEntry>,
 102    },
 103    Remote,
 104    OpenBuffersOnly,
 105}
 106
 107impl Search {
 108    pub fn local(
 109        fs: Arc<dyn Fs>,
 110        buffer_store: Entity<BufferStore>,
 111        worktree_store: Entity<WorktreeStore>,
 112        limit: usize,
 113        cx: &mut App,
 114    ) -> Self {
 115        let worktrees = worktree_store.read(cx).visible_worktrees(cx).collect();
 116        Self {
 117            kind: SearchKind::Local { fs, worktrees },
 118            buffer_store,
 119            worktree_store,
 120            limit,
 121        }
 122    }
 123
 124    pub(crate) fn remote(
 125        buffer_store: Entity<BufferStore>,
 126        worktree_store: Entity<WorktreeStore>,
 127        limit: usize,
 128        client_state: (AnyProtoClient, u64, Arc<Mutex<RemotelyCreatedModels>>),
 129    ) -> Self {
 130        Self {
 131            kind: SearchKind::Remote {
 132                client: client_state.0,
 133                remote_id: client_state.1,
 134                models: client_state.2,
 135            },
 136            buffer_store,
 137            worktree_store,
 138            limit,
 139        }
 140    }
 141    pub(crate) fn open_buffers_only(
 142        buffer_store: Entity<BufferStore>,
 143        worktree_store: Entity<WorktreeStore>,
 144        limit: usize,
 145    ) -> Self {
 146        Self {
 147            kind: SearchKind::OpenBuffersOnly,
 148            buffer_store,
 149            worktree_store,
 150            limit,
 151        }
 152    }
 153
 154    pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000;
 155    pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
 156    /// Prepares a project search run. The resulting [`SearchResultsHandle`] has to be used to specify whether you're interested in matching buffers
 157    /// or full search results.
 158    pub fn into_handle(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
 159        let mut open_buffers = HashSet::default();
 160        let mut unnamed_buffers = Vec::new();
 161        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
 162        let buffers = self.buffer_store.read(cx);
 163        for handle in buffers.buffers() {
 164            let buffer = handle.read(cx);
 165            if !buffers.is_searchable(&buffer.remote_id()) {
 166                continue;
 167            } else if buffer
 168                .file()
 169                .is_some_and(|file| file.disk_state().is_deleted())
 170            {
 171                continue;
 172            } else if let Some(entry_id) = buffer.entry_id(cx) {
 173                open_buffers.insert(entry_id);
 174            } else {
 175                self.limit = self.limit.saturating_sub(1);
 176                unnamed_buffers.push(handle)
 177            };
 178        }
 179        let open_buffers = Arc::new(open_buffers);
 180        let executor = cx.background_executor().clone();
 181        let (tx, rx) = unbounded();
 182        let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
 183        let matching_buffers = grab_buffer_snapshot_rx.clone();
 184        let trigger_search = Box::new(move |cx: &mut App| {
 185            cx.spawn(async move |cx| {
 186                for buffer in unnamed_buffers {
 187                    _ = grab_buffer_snapshot_tx.send(buffer).await;
 188                }
 189
 190                let (find_all_matches_tx, find_all_matches_rx) =
 191                    bounded(MAX_CONCURRENT_BUFFER_OPENS);
 192                let query = Arc::new(query);
 193                let (candidate_searcher, tasks) = match self.kind {
 194                    SearchKind::OpenBuffersOnly => {
 195                        let open_buffers = cx.update(|cx| self.all_loaded_buffers(&query, cx));
 196                        let fill_requests = cx
 197                            .background_spawn(async move {
 198                                for buffer in open_buffers {
 199                                    if let Err(_) = grab_buffer_snapshot_tx.send(buffer).await {
 200                                        return;
 201                                    }
 202                                }
 203                            })
 204                            .boxed_local();
 205                        (FindSearchCandidates::OpenBuffersOnly, vec![fill_requests])
 206                    }
 207                    SearchKind::Local {
 208                        fs,
 209                        ref mut worktrees,
 210                    } => {
 211                        let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
 212                            unbounded();
 213                        let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
 214                            bounded(64);
 215                        let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
 216
 217                        let (input_paths_tx, input_paths_rx) = unbounded();
 218                        let tasks = vec![
 219                            cx.spawn(Self::provide_search_paths(
 220                                std::mem::take(worktrees),
 221                                query.clone(),
 222                                input_paths_tx,
 223                                sorted_search_results_tx,
 224                                tx.clone(),
 225                            ))
 226                            .boxed_local(),
 227                            Self::open_buffers(
 228                                self.buffer_store,
 229                                get_buffer_for_full_scan_rx,
 230                                grab_buffer_snapshot_tx,
 231                                cx.clone(),
 232                            )
 233                            .boxed_local(),
 234                            cx.background_spawn(Self::maintain_sorted_search_results(
 235                                sorted_search_results_rx,
 236                                get_buffer_for_full_scan_tx,
 237                                self.limit,
 238                            ))
 239                            .boxed_local(),
 240                        ];
 241                        (
 242                            FindSearchCandidates::Local {
 243                                fs,
 244                                confirm_contents_will_match_tx,
 245                                confirm_contents_will_match_rx,
 246                                input_paths_rx,
 247                            },
 248                            tasks,
 249                        )
 250                    }
 251                    SearchKind::Remote {
 252                        client,
 253                        remote_id,
 254                        models,
 255                    } => {
 256                        let (handle, rx) = self
 257                            .buffer_store
 258                            .update(cx, |this, _| this.register_project_search_result_handle());
 259
 260                        let cancel_ongoing_search = util::defer({
 261                            let client = client.clone();
 262                            move || {
 263                                _ = client.send(proto::FindSearchCandidatesCancelled {
 264                                    project_id: remote_id,
 265                                    handle,
 266                                });
 267                            }
 268                        });
 269                        let request = client.request(proto::FindSearchCandidates {
 270                            project_id: remote_id,
 271                            query: Some(query.to_proto()),
 272                            limit: self.limit as _,
 273                            handle,
 274                        });
 275
 276                        let buffer_store = self.buffer_store;
 277                        let guard = cx.update(|cx| {
 278                            Project::retain_remotely_created_models_impl(
 279                                &models,
 280                                &buffer_store,
 281                                &self.worktree_store,
 282                                cx,
 283                            )
 284                        });
 285
 286                        let issue_remote_buffers_request = cx
 287                            .spawn(async move |cx| {
 288                                let _ = maybe!(async move {
 289                                    request.await?;
 290
 291                                    let (buffer_tx, buffer_rx) = bounded(24);
 292
 293                                    let wait_for_remote_buffers = cx.spawn(async move |cx| {
 294                                        while let Ok(buffer_id) = rx.recv().await {
 295                                            let buffer =
 296                                                buffer_store.update(cx, |buffer_store, cx| {
 297                                                    buffer_store
 298                                                        .wait_for_remote_buffer(buffer_id, cx)
 299                                                });
 300                                            buffer_tx.send(buffer).await?;
 301                                        }
 302                                        anyhow::Ok(())
 303                                    });
 304
 305                                    let forward_buffers = cx.background_spawn(async move {
 306                                        while let Ok(buffer) = buffer_rx.recv().await {
 307                                            let _ =
 308                                                grab_buffer_snapshot_tx.send(buffer.await?).await;
 309                                        }
 310                                        anyhow::Ok(())
 311                                    });
 312                                    let (left, right) = futures::future::join(
 313                                        wait_for_remote_buffers,
 314                                        forward_buffers,
 315                                    )
 316                                    .await;
 317                                    left?;
 318                                    right?;
 319
 320                                    drop(guard);
 321                                    cancel_ongoing_search.abort();
 322                                    anyhow::Ok(())
 323                                })
 324                                .await
 325                                .log_err();
 326                            })
 327                            .boxed_local();
 328                        (
 329                            FindSearchCandidates::Remote,
 330                            vec![issue_remote_buffers_request],
 331                        )
 332                    }
 333                };
 334
 335                let should_find_all_matches = !tx.is_closed();
 336
 337                let _executor = executor.clone();
 338                let worker_pool = executor.spawn(async move {
 339                    let num_cpus = _executor.num_cpus();
 340
 341                    assert!(num_cpus > 0);
 342                    _executor
 343                        .scoped(|scope| {
 344                            let worker_count = (num_cpus - 1).max(1);
 345                            for _ in 0..worker_count {
 346                                let worker = Worker {
 347                                    query: query.clone(),
 348                                    open_buffers: open_buffers.clone(),
 349                                    candidates: candidate_searcher.clone(),
 350                                    find_all_matches_rx: find_all_matches_rx.clone(),
 351                                };
 352                                scope.spawn(worker.run());
 353                            }
 354
 355                            drop(find_all_matches_rx);
 356                            drop(candidate_searcher);
 357                        })
 358                        .await;
 359                });
 360
 361                let (sorted_matches_tx, sorted_matches_rx) = unbounded();
 362                // The caller of `into_handle` decides whether they're interested in all matches (files that matched + all matching ranges) or
 363                // just the files. *They are using the same stream as the guts of the project search do*.
 364                // This means that we cannot grab values off of that stream unless it's strictly needed for making a progress in project search.
 365                //
 366                // Grabbing buffer snapshots is only necessary when we're looking for all matches. If the caller decided that they're not interested
 367                // in all matches, running that task unconditionally would hinder caller's ability to observe all matching file paths.
 368                let buffer_snapshots = if should_find_all_matches {
 369                    Some(
 370                        Self::grab_buffer_snapshots(
 371                            grab_buffer_snapshot_rx,
 372                            find_all_matches_tx,
 373                            sorted_matches_tx,
 374                            cx.clone(),
 375                        )
 376                        .boxed_local(),
 377                    )
 378                } else {
 379                    drop(find_all_matches_tx);
 380
 381                    None
 382                };
 383                let ensure_matches_are_reported_in_order = if should_find_all_matches {
 384                    Some(
 385                        Self::ensure_matched_ranges_are_reported_in_order(sorted_matches_rx, tx)
 386                            .boxed_local(),
 387                    )
 388                } else {
 389                    drop(tx);
 390                    None
 391                };
 392
 393                futures::future::join_all(
 394                    [worker_pool.boxed_local()]
 395                        .into_iter()
 396                        .chain(buffer_snapshots)
 397                        .chain(ensure_matches_are_reported_in_order)
 398                        .chain(tasks),
 399                )
 400                .await;
 401            })
 402        });
 403
 404        SearchResultsHandle {
 405            results: rx,
 406            matching_buffers,
 407            trigger_search,
 408        }
 409    }
 410
 411    fn provide_search_paths(
 412        worktrees: Vec<Entity<Worktree>>,
 413        query: Arc<SearchQuery>,
 414        tx: Sender<InputPath>,
 415        results: Sender<oneshot::Receiver<ProjectPath>>,
 416        results_tx: Sender<SearchResult>,
 417    ) -> impl AsyncFnOnce(&mut AsyncApp) {
 418        async move |cx| {
 419            _ = maybe!(async move {
 420                let gitignored_tracker = PathInclusionMatcher::new(query.clone());
 421                let include_ignored = query.include_ignored();
 422                for worktree in worktrees {
 423                    let scan_complete = worktree.read_with(cx, |worktree, _| {
 424                        worktree.as_local().map(|local| local.scan_complete())
 425                    });
 426                    if let Some(scan_complete) = scan_complete {
 427                        _ = results_tx.send(SearchResult::WaitingForScan).await;
 428                        scan_complete.await;
 429                    }
 430
 431                    let (mut snapshot, worktree_settings) = worktree
 432                        .read_with(cx, |this, _| {
 433                            Some((this.snapshot(), this.as_local()?.settings()))
 434                        })
 435                        .context("The worktree is not local")?;
 436                    if query.include_ignored() {
 437                        // Pre-fetch all of the ignored directories as they're going to be searched.
 438                        let mut entries_to_refresh = vec![];
 439
 440                        for entry in snapshot.entries(query.include_ignored(), 0) {
 441                            if gitignored_tracker.should_scan_gitignored_dir(
 442                                entry,
 443                                &snapshot,
 444                                &worktree_settings,
 445                            ) {
 446                                entries_to_refresh.push(entry.path.clone());
 447                            }
 448                        }
 449                        let barrier = worktree.update(cx, |this, _| {
 450                            let local = this.as_local_mut()?;
 451                            let barrier = entries_to_refresh
 452                                .into_iter()
 453                                .map(|path| local.add_path_prefix_to_scan(path).into_future())
 454                                .collect::<Vec<_>>();
 455                            Some(barrier)
 456                        });
 457                        if let Some(barriers) = barrier {
 458                            futures::future::join_all(barriers).await;
 459                        }
 460                        snapshot = worktree.read_with(cx, |this, _| this.snapshot());
 461                    }
 462                    let tx = tx.clone();
 463                    let results = results.clone();
 464
 465                    cx.background_executor()
 466                        .spawn(async move {
 467                            for entry in snapshot.files(include_ignored, 0) {
 468                                let (should_scan_tx, should_scan_rx) = oneshot::channel();
 469
 470                                let Ok(_) = tx
 471                                    .send(InputPath {
 472                                        entry: entry.clone(),
 473                                        snapshot: snapshot.clone(),
 474                                        should_scan_tx,
 475                                    })
 476                                    .await
 477                                else {
 478                                    return;
 479                                };
 480                                if results.send(should_scan_rx).await.is_err() {
 481                                    return;
 482                                };
 483                            }
 484                        })
 485                        .await;
 486                }
 487                anyhow::Ok(())
 488            })
 489            .await;
 490        }
 491    }
 492
 493    async fn maintain_sorted_search_results(
 494        rx: Receiver<oneshot::Receiver<ProjectPath>>,
 495        paths_for_full_scan: Sender<ProjectPath>,
 496        limit: usize,
 497    ) {
 498        let mut rx = pin!(rx);
 499        let mut matched = 0;
 500        while let Some(mut next_path_result) = rx.next().await {
 501            let Some(successful_path) = next_path_result.next().await else {
 502                // This file did not produce a match, hence skip it.
 503                continue;
 504            };
 505            if paths_for_full_scan.send(successful_path).await.is_err() {
 506                return;
 507            };
 508            matched += 1;
 509            if matched >= limit {
 510                break;
 511            }
 512        }
 513    }
 514
 515    /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
 516    async fn open_buffers(
 517        buffer_store: Entity<BufferStore>,
 518        rx: Receiver<ProjectPath>,
 519        find_all_matches_tx: Sender<Entity<Buffer>>,
 520        mut cx: AsyncApp,
 521    ) {
 522        let mut rx = pin!(rx.ready_chunks(64));
 523        _ = maybe!(async move {
 524            while let Some(requested_paths) = rx.next().await {
 525                let mut buffers = buffer_store.update(&mut cx, |this, cx| {
 526                    requested_paths
 527                        .into_iter()
 528                        .map(|path| this.open_buffer(path, cx))
 529                        .collect::<FuturesOrdered<_>>()
 530                });
 531
 532                while let Some(buffer) = buffers.next().await {
 533                    if let Some(buffer) = buffer.log_err() {
 534                        find_all_matches_tx.send(buffer).await?;
 535                    }
 536                }
 537            }
 538            Result::<_, anyhow::Error>::Ok(())
 539        })
 540        .await;
 541    }
 542
 543    async fn grab_buffer_snapshots(
 544        rx: Receiver<Entity<Buffer>>,
 545        find_all_matches_tx: Sender<(
 546            Entity<Buffer>,
 547            BufferSnapshot,
 548            oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
 549        )>,
 550        results: Sender<oneshot::Receiver<(Entity<Buffer>, Vec<Range<language::Anchor>>)>>,
 551        mut cx: AsyncApp,
 552    ) {
 553        _ = maybe!(async move {
 554            while let Ok(buffer) = rx.recv().await {
 555                let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot());
 556                let (tx, rx) = oneshot::channel();
 557                find_all_matches_tx.send((buffer, snapshot, tx)).await?;
 558                results.send(rx).await?;
 559            }
 560            debug_assert!(rx.is_empty());
 561            Result::<_, anyhow::Error>::Ok(())
 562        })
 563        .await;
 564    }
 565
 566    async fn ensure_matched_ranges_are_reported_in_order(
 567        rx: Receiver<oneshot::Receiver<(Entity<Buffer>, Vec<Range<language::Anchor>>)>>,
 568        tx: Sender<SearchResult>,
 569    ) {
 570        use postage::stream::Stream;
 571        _ = maybe!(async move {
 572            let mut matched_buffers = 0;
 573            let mut matches = 0;
 574            while let Ok(mut next_buffer_matches) = rx.recv().await {
 575                let Some((buffer, ranges)) = next_buffer_matches.recv().await else {
 576                    continue;
 577                };
 578
 579                if matched_buffers > Search::MAX_SEARCH_RESULT_FILES
 580                    || matches > Search::MAX_SEARCH_RESULT_RANGES
 581                {
 582                    _ = tx.send(SearchResult::LimitReached).await;
 583                    break;
 584                }
 585                matched_buffers += 1;
 586                matches += ranges.len();
 587
 588                _ = tx.send(SearchResult::Buffer { buffer, ranges }).await?;
 589            }
 590            anyhow::Ok(())
 591        })
 592        .await;
 593    }
 594
 595    fn all_loaded_buffers(&self, search_query: &SearchQuery, cx: &App) -> Vec<Entity<Buffer>> {
 596        let worktree_store = self.worktree_store.read(cx);
 597        let mut buffers = search_query
 598            .buffers()
 599            .into_iter()
 600            .flatten()
 601            .filter(|buffer| {
 602                let b = buffer.read(cx);
 603                if let Some(file) = b.file() {
 604                    if file.disk_state().is_deleted() {
 605                        return false;
 606                    }
 607                    if !search_query.match_path(file.path()) {
 608                        return false;
 609                    }
 610                    if !search_query.include_ignored()
 611                        && let Some(entry) = b
 612                            .entry_id(cx)
 613                            .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx))
 614                        && entry.is_ignored
 615                    {
 616                        return false;
 617                    }
 618                }
 619                true
 620            })
 621            .cloned()
 622            .collect::<Vec<_>>();
 623        buffers.sort_by(|a, b| {
 624            let a = a.read(cx);
 625            let b = b.read(cx);
 626            match (a.file(), b.file()) {
 627                (None, None) => a.remote_id().cmp(&b.remote_id()),
 628                (None, Some(_)) => std::cmp::Ordering::Less,
 629                (Some(_), None) => std::cmp::Ordering::Greater,
 630                (Some(a), Some(b)) => compare_rel_paths((a.path(), true), (b.path(), true)),
 631            }
 632        });
 633
 634        buffers
 635    }
 636}
 637
 638struct Worker {
 639    query: Arc<SearchQuery>,
 640    open_buffers: Arc<HashSet<ProjectEntryId>>,
 641    candidates: FindSearchCandidates,
 642    /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
 643    /// Then, when you're done, share them via the channel you were given.
 644    find_all_matches_rx: Receiver<(
 645        Entity<Buffer>,
 646        BufferSnapshot,
 647        oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
 648    )>,
 649}
 650
 651impl Worker {
 652    async fn run(self) {
 653        let (
 654            input_paths_rx,
 655            confirm_contents_will_match_rx,
 656            mut confirm_contents_will_match_tx,
 657            fs,
 658        ) = match self.candidates {
 659            FindSearchCandidates::Local {
 660                fs,
 661                input_paths_rx,
 662                confirm_contents_will_match_rx,
 663                confirm_contents_will_match_tx,
 664            } => (
 665                input_paths_rx,
 666                confirm_contents_will_match_rx,
 667                confirm_contents_will_match_tx,
 668                Some(fs),
 669            ),
 670            FindSearchCandidates::Remote | FindSearchCandidates::OpenBuffersOnly => {
 671                (unbounded().1, unbounded().1, unbounded().0, None)
 672            }
 673        };
 674        // WorkerA: grabs a request for "find all matches in file/a" <- takes 5 minutes
 675        // right after: WorkerB: grabs a request for "find all matches in file/b" <- takes 5 seconds
 676        let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
 677        let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse());
 678        let mut scan_path = pin!(input_paths_rx.fuse());
 679
 680        loop {
 681            let handler = RequestHandler {
 682                query: &self.query,
 683                open_entries: &self.open_buffers,
 684                fs: fs.as_deref(),
 685                confirm_contents_will_match_tx: &confirm_contents_will_match_tx,
 686            };
 687            // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
 688            // steps straight away. Another worker might be about to produce a value that will
 689            // be pushed there, thus we'll replace current worker's pipe with a dummy one.
 690            // That way, we'll only ever close a next-stage channel when ALL workers do so.
 691            select_biased! {
 692                find_all_matches = find_all_matches.next() => {
 693                    let Some(matches) = find_all_matches else {
 694                        continue;
 695                    };
 696                    handler.handle_find_all_matches(matches).await;
 697                },
 698                find_first_match = find_first_match.next() => {
 699                    if let Some(buffer_with_at_least_one_match) = find_first_match {
 700                        handler.handle_find_first_match(buffer_with_at_least_one_match).await;
 701                    }
 702                },
 703                scan_path = scan_path.next() => {
 704                    if let Some(path_to_scan) = scan_path {
 705                        handler.handle_scan_path(path_to_scan).await;
 706                    } else {
 707                        // If we're the last worker to notice that this is not producing values, close the upstream.
 708                        confirm_contents_will_match_tx = bounded(1).0;
 709                    }
 710
 711                 }
 712                 complete => {
 713                     break
 714                },
 715
 716            }
 717        }
 718    }
 719}
 720
 721struct RequestHandler<'worker> {
 722    query: &'worker SearchQuery,
 723    fs: Option<&'worker dyn Fs>,
 724    open_entries: &'worker HashSet<ProjectEntryId>,
 725    confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
 726}
 727
 728impl RequestHandler<'_> {
 729    async fn handle_find_all_matches(
 730        &self,
 731        (buffer, snapshot, mut report_matches): (
 732            Entity<Buffer>,
 733            BufferSnapshot,
 734            oneshot::Sender<(Entity<Buffer>, Vec<Range<language::Anchor>>)>,
 735        ),
 736    ) {
 737        let ranges = self
 738            .query
 739            .search(&snapshot, None)
 740            .await
 741            .iter()
 742            .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
 743            .collect::<Vec<_>>();
 744
 745        _ = report_matches.send((buffer, ranges)).await;
 746    }
 747
 748    async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
 749        async move {
 750            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
 751            let Some(file) = self
 752                .fs
 753                .context("Trying to query filesystem in remote project search")?
 754                .open_sync(&abs_path)
 755                .await
 756                .log_err()
 757            else {
 758                return anyhow::Ok(());
 759            };
 760
 761            let mut file = BufReader::new(file);
 762            let file_start = file.fill_buf()?;
 763
 764            if let Err(Some(starting_position)) =
 765                std::str::from_utf8(file_start).map_err(|e| e.error_len())
 766            {
 767                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
 768                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
 769                log::debug!(
 770                    "Invalid UTF-8 sequence in file {abs_path:?} \
 771                    at byte position {starting_position}"
 772                );
 773                return Ok(());
 774            }
 775
 776            if self.query.detect(file).await.unwrap_or(false) {
 777                // Yes, we should scan the whole file.
 778                entry.should_scan_tx.send(entry.path).await?;
 779            }
 780            Ok(())
 781        }
 782        .await
 783        .ok();
 784    }
 785
 786    async fn handle_scan_path(&self, req: InputPath) {
 787        _ = maybe!(async move {
 788            let InputPath {
 789                entry,
 790                snapshot,
 791                mut should_scan_tx,
 792            } = req;
 793
 794            if entry.is_fifo || !entry.is_file() {
 795                return Ok(());
 796            }
 797
 798            if self.query.filters_path() {
 799                let matched_path = if self.query.match_full_paths() {
 800                    let mut full_path = snapshot.root_name().to_owned();
 801                    full_path.push(&entry.path);
 802                    self.query.match_path(&full_path)
 803                } else {
 804                    self.query.match_path(&entry.path)
 805                };
 806                if !matched_path {
 807                    return Ok(());
 808                }
 809            }
 810
 811            if self.open_entries.contains(&entry.id) {
 812                // The buffer is already in memory and that's the version we want to scan;
 813                // hence skip the dilly-dally and look for all matches straight away.
 814                should_scan_tx
 815                    .send(ProjectPath {
 816                        worktree_id: snapshot.id(),
 817                        path: entry.path.clone(),
 818                    })
 819                    .await?;
 820            } else {
 821                self.confirm_contents_will_match_tx
 822                    .send(MatchingEntry {
 823                        should_scan_tx: should_scan_tx,
 824                        worktree_root: snapshot.abs_path().clone(),
 825                        path: ProjectPath {
 826                            worktree_id: snapshot.id(),
 827                            path: entry.path.clone(),
 828                        },
 829                    })
 830                    .await?;
 831            }
 832
 833            anyhow::Ok(())
 834        })
 835        .await;
 836    }
 837}
 838
 839struct InputPath {
 840    entry: Entry,
 841    snapshot: Snapshot,
 842    should_scan_tx: oneshot::Sender<ProjectPath>,
 843}
 844
 845struct MatchingEntry {
 846    worktree_root: Arc<Path>,
 847    path: ProjectPath,
 848    should_scan_tx: oneshot::Sender<ProjectPath>,
 849}
 850
 851/// This struct encapsulates the logic to decide whether a given gitignored directory should be
 852/// scanned based on include/exclude patterns of a search query (as include/exclude parameters may match paths inside it).
 853/// It is kind-of doing an inverse of glob. Given a glob pattern like `src/**/` and a parent path like `src`, we need to decide whether the parent
 854/// may contain glob hits.
 855pub struct PathInclusionMatcher {
 856    included: BTreeSet<PathBuf>,
 857    query: Arc<SearchQuery>,
 858}
 859
 860impl PathInclusionMatcher {
 861    pub fn new(query: Arc<SearchQuery>) -> Self {
 862        let mut included = BTreeSet::new();
 863        // To do an inverse glob match, we split each glob into it's prefix and the glob part.
 864        // For example, `src/**/*.rs` becomes `src/` and `**/*.rs`. The glob part gets dropped.
 865        // Then, when checking whether a given directory should be scanned, we check whether it is a non-empty substring of any glob prefix.
 866        if query.filters_path() {
 867            included.extend(
 868                query
 869                    .files_to_include()
 870                    .sources()
 871                    .flat_map(|glob| Some(wax::Glob::new(glob).ok()?.partition().0)),
 872            );
 873        }
 874        Self { included, query }
 875    }
 876
 877    pub fn should_scan_gitignored_dir(
 878        &self,
 879        entry: &Entry,
 880        snapshot: &Snapshot,
 881        worktree_settings: &WorktreeSettings,
 882    ) -> bool {
 883        if !entry.is_ignored || !entry.kind.is_unloaded() {
 884            return false;
 885        }
 886        if !self.query.include_ignored() {
 887            return false;
 888        }
 889        if worktree_settings.is_path_excluded(&entry.path) {
 890            return false;
 891        }
 892        if !self.query.filters_path() {
 893            return true;
 894        }
 895
 896        let as_abs_path = LazyCell::new(move || snapshot.absolutize(&entry.path));
 897        let entry_path = &entry.path;
 898        // 3. Check Exclusions (Pruning)
 899        // If the current path is a child of an excluded path, we stop.
 900        let is_excluded = self.path_is_definitely_excluded(&entry_path, snapshot);
 901
 902        if is_excluded {
 903            return false;
 904        }
 905
 906        // 4. Check Inclusions (Traversal)
 907        if self.included.is_empty() {
 908            return true;
 909        }
 910
 911        // We scan if the current path is a descendant of an include prefix
 912        // OR if the current path is an ancestor of an include prefix (we need to go deeper to find it).
 913        let is_included = self.included.iter().any(|prefix| {
 914            let (prefix_matches_entry, entry_matches_prefix) = if prefix.is_absolute() {
 915                (
 916                    prefix.starts_with(&**as_abs_path),
 917                    as_abs_path.starts_with(prefix),
 918                )
 919            } else {
 920                RelPath::new(prefix, snapshot.path_style()).map_or((false, false), |prefix| {
 921                    (
 922                        prefix.starts_with(entry_path),
 923                        entry_path.starts_with(&prefix),
 924                    )
 925                })
 926            };
 927
 928            // Logic:
 929            // 1. entry_matches_prefix: We are inside the target zone (e.g. glob: src/, current: src/lib/). Keep scanning.
 930            // 2. prefix_matches_entry: We are above the target zone (e.g. glob: src/foo/, current: src/). Keep scanning to reach foo.
 931            prefix_matches_entry || entry_matches_prefix
 932        });
 933
 934        is_included
 935    }
 936    fn path_is_definitely_excluded(&self, path: &RelPath, snapshot: &Snapshot) -> bool {
 937        if !self.query.files_to_exclude().sources().next().is_none() {
 938            let mut path = if self.query.match_full_paths() {
 939                let mut full_path = snapshot.root_name().to_owned();
 940                full_path.push(path);
 941                full_path
 942            } else {
 943                path.to_owned()
 944            };
 945            loop {
 946                if self.query.files_to_exclude().is_match(&path) {
 947                    return true;
 948                } else if !path.pop() {
 949                    return false;
 950                }
 951            }
 952        } else {
 953            false
 954        }
 955    }
 956}
 957
 958type IsTerminating = bool;
 959/// Adaptive batcher that starts eager (small batches) and grows batch size
 960/// when items arrive quickly, reducing RPC overhead while preserving low latency
 961/// for slow streams.
 962pub struct AdaptiveBatcher<T> {
 963    items: Sender<T>,
 964    flush_batch: Sender<IsTerminating>,
 965    _batch_task: Task<()>,
 966}
 967
 968impl<T: 'static + Send> AdaptiveBatcher<T> {
 969    pub fn new(cx: &BackgroundExecutor) -> (Self, Receiver<Vec<T>>) {
 970        let (items, rx) = unbounded();
 971        let (batch_tx, batch_rx) = unbounded();
 972        let (flush_batch_tx, flush_batch_rx) = unbounded();
 973        let flush_batch = flush_batch_tx.clone();
 974        let executor = cx.clone();
 975        let _batch_task = cx.spawn_with_priority(gpui::Priority::High, async move {
 976            let mut current_batch = vec![];
 977            let mut items_produced_so_far = 0_u64;
 978
 979            let mut _schedule_flush_after_delay: Option<Task<()>> = None;
 980            let _time_elapsed_since_start_of_search = std::time::Instant::now();
 981            let mut flush = pin!(flush_batch_rx);
 982            let mut terminating = false;
 983            loop {
 984                select_biased! {
 985                    item = rx.recv().fuse() => {
 986                        match item {
 987                            Ok(new_item) => {
 988                                let is_fresh_batch = current_batch.is_empty();
 989                                items_produced_so_far += 1;
 990                                current_batch.push(new_item);
 991                                if is_fresh_batch {
 992                                    // Chosen arbitrarily based on some experimentation with plots.
 993                                    let desired_duration_ms = (20 * (items_produced_so_far + 2).ilog2() as u64).min(300);
 994                                    let desired_duration = Duration::from_millis(desired_duration_ms);
 995                                    let _executor = executor.clone();
 996                                    let _flush = flush_batch_tx.clone();
 997                                    let new_timer = executor.spawn_with_priority(Priority::High, async move {
 998                                        _executor.timer(desired_duration).await;
 999                                        _ = _flush.send(false).await;
1000                                    });
1001                                    _schedule_flush_after_delay = Some(new_timer);
1002                                }
1003                            }
1004                            Err(_) => {
1005                                // Items channel closed - send any remaining batch before exiting
1006                                if !current_batch.is_empty() {
1007                                    _ = batch_tx.send(std::mem::take(&mut current_batch)).await;
1008                                }
1009                                break;
1010                            }
1011                        }
1012                    }
1013                    should_break_afterwards = flush.next() => {
1014                        if !current_batch.is_empty() {
1015                            _ = batch_tx.send(std::mem::take(&mut current_batch)).await;
1016                            _schedule_flush_after_delay = None;
1017                        }
1018                        if should_break_afterwards.unwrap_or_default() {
1019                            terminating = true;
1020                        }
1021                    }
1022                    complete => {
1023                        break;
1024                    }
1025                }
1026                if terminating {
1027                    // Drain any remaining items before exiting
1028                    while let Ok(new_item) = rx.try_recv() {
1029                        current_batch.push(new_item);
1030                    }
1031                    if !current_batch.is_empty() {
1032                        _ = batch_tx.send(std::mem::take(&mut current_batch)).await;
1033                    }
1034                    break;
1035                }
1036            }
1037        });
1038        let this = Self {
1039            items,
1040            _batch_task,
1041            flush_batch,
1042        };
1043        (this, batch_rx)
1044    }
1045
1046    pub async fn push(&self, item: T) {
1047        _ = self.items.send(item).await;
1048    }
1049
1050    pub async fn flush(self) {
1051        _ = self.flush_batch.send(true).await;
1052        self._batch_task.await;
1053    }
1054}