1use std::{
2 io::{BufRead, BufReader},
3 path::Path,
4 pin::pin,
5 sync::{
6 Arc,
7 atomic::{AtomicUsize, Ordering},
8 },
9};
10
11use anyhow::Context;
12use collections::HashSet;
13use fs::Fs;
14use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
15use gpui::{App, AppContext, AsyncApp, Entity, Task};
16use language::{Buffer, BufferSnapshot};
17use parking_lot::Mutex;
18use postage::oneshot;
19use rpc::{AnyProtoClient, proto};
20use smol::{
21 channel::{Receiver, Sender, bounded, unbounded},
22 future::FutureExt,
23};
24
25use text::BufferId;
26use util::{ResultExt, maybe};
27use worktree::{Entry, ProjectEntryId, Snapshot, Worktree};
28
29use crate::{
30 Project, ProjectItem, ProjectPath, RemotelyCreatedModels,
31 buffer_store::BufferStore,
32 search::{SearchQuery, SearchResult},
33 worktree_store::WorktreeStore,
34};
35
36pub struct Search {
37 buffer_store: Entity<BufferStore>,
38 worktree_store: Entity<WorktreeStore>,
39 limit: usize,
40 kind: SearchKind,
41}
42
43enum SearchKind {
44 Local {
45 fs: Arc<dyn Fs>,
46 worktrees: Vec<Entity<Worktree>>,
47 },
48 Remote {
49 client: AnyProtoClient,
50 remote_id: u64,
51 models: Arc<Mutex<RemotelyCreatedModels>>,
52 },
53}
54
55/// Represents results of project search and allows one to either obtain match positions OR
56/// just the handles to buffers that may match the search.
57#[must_use]
58pub struct SearchResultsHandle {
59 results: Receiver<SearchResult>,
60 matching_buffers: Receiver<Entity<Buffer>>,
61 trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
62}
63
64impl SearchResultsHandle {
65 pub fn results(self, cx: &mut App) -> Receiver<SearchResult> {
66 (self.trigger_search)(cx).detach();
67 self.results
68 }
69 pub fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
70 (self.trigger_search)(cx).detach();
71 self.matching_buffers
72 }
73}
74
75#[derive(Clone)]
76enum FindSearchCandidates {
77 Local {
78 fs: Arc<dyn Fs>,
79 /// Start off with all paths in project and filter them based on:
80 /// - Include filters
81 /// - Exclude filters
82 /// - Only open buffers
83 /// - Scan ignored files
84 /// Put another way: filter out files that can't match (without looking at file contents)
85 input_paths_rx: Receiver<InputPath>,
86 /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
87 /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
88 confirm_contents_will_match_tx: Sender<MatchingEntry>,
89 confirm_contents_will_match_rx: Receiver<MatchingEntry>,
90 /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
91 /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
92 get_buffer_for_full_scan_tx: Sender<ProjectPath>,
93 },
94 Remote,
95}
96
97impl Search {
98 pub fn local(
99 fs: Arc<dyn Fs>,
100 buffer_store: Entity<BufferStore>,
101 worktree_store: Entity<WorktreeStore>,
102
103 limit: usize,
104 cx: &mut App,
105 ) -> Self {
106 let worktrees = worktree_store.read(cx).visible_worktrees(cx).collect();
107 Self {
108 kind: SearchKind::Local { fs, worktrees },
109 buffer_store,
110 worktree_store,
111 limit,
112 }
113 }
114
115 pub(crate) fn remote(
116 buffer_store: Entity<BufferStore>,
117 worktree_store: Entity<WorktreeStore>,
118 limit: usize,
119 client_state: (AnyProtoClient, u64, Arc<Mutex<RemotelyCreatedModels>>),
120 ) -> Self {
121 Self {
122 kind: SearchKind::Remote {
123 client: client_state.0,
124 remote_id: client_state.1,
125 models: client_state.2,
126 },
127 buffer_store,
128 worktree_store,
129 limit,
130 }
131 }
132
133 pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000;
134 pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
135 /// Prepares a project search run. The result has to be used to specify whether you're interested in matching buffers
136 /// or full search results.
137 pub fn into_results(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
138 let mut open_buffers = HashSet::default();
139 let mut unnamed_buffers = Vec::new();
140 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
141 let buffers = self.buffer_store.read(cx);
142 for handle in buffers.buffers() {
143 let buffer = handle.read(cx);
144 if !buffers.is_searchable(&buffer.remote_id()) {
145 continue;
146 } else if let Some(entry_id) = buffer.entry_id(cx) {
147 open_buffers.insert(entry_id);
148 } else {
149 self.limit -= self.limit.saturating_sub(1);
150 unnamed_buffers.push(handle)
151 };
152 }
153 let executor = cx.background_executor().clone();
154 let (tx, rx) = unbounded();
155 let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
156 let matching_buffers = grab_buffer_snapshot_rx.clone();
157 let trigger_search = Box::new(move |cx: &mut App| {
158 cx.spawn(async move |cx| {
159 for buffer in unnamed_buffers {
160 _ = grab_buffer_snapshot_tx.send(buffer).await;
161 }
162
163 let (find_all_matches_tx, find_all_matches_rx) =
164 bounded(MAX_CONCURRENT_BUFFER_OPENS);
165
166 let (candidate_searcher, tasks) = match self.kind {
167 SearchKind::Local {
168 fs,
169 ref mut worktrees,
170 } => {
171 let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
172 unbounded();
173 let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
174 bounded(64);
175 let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
176
177 let (input_paths_tx, input_paths_rx) = unbounded();
178
179 let tasks = vec![
180 cx.spawn(Self::provide_search_paths(
181 std::mem::take(worktrees),
182 query.include_ignored(),
183 input_paths_tx,
184 sorted_search_results_tx,
185 ))
186 .boxed_local(),
187 Self::open_buffers(
188 &self.buffer_store,
189 get_buffer_for_full_scan_rx,
190 grab_buffer_snapshot_tx,
191 cx.clone(),
192 )
193 .boxed_local(),
194 cx.background_spawn(Self::maintain_sorted_search_results(
195 sorted_search_results_rx,
196 get_buffer_for_full_scan_tx.clone(),
197 self.limit,
198 ))
199 .boxed_local(),
200 ];
201 (
202 FindSearchCandidates::Local {
203 fs,
204 get_buffer_for_full_scan_tx,
205 confirm_contents_will_match_tx,
206 confirm_contents_will_match_rx,
207 input_paths_rx,
208 },
209 tasks,
210 )
211 }
212 SearchKind::Remote {
213 client,
214 remote_id,
215 models,
216 } => {
217 let request = client.request(proto::FindSearchCandidates {
218 project_id: remote_id,
219 query: Some(query.to_proto()),
220 limit: self.limit as _,
221 });
222 let Ok(guard) = cx.update(|cx| {
223 Project::retain_remotely_created_models_impl(
224 &models,
225 &self.buffer_store,
226 &self.worktree_store,
227 cx,
228 )
229 }) else {
230 return;
231 };
232 let buffer_store = self.buffer_store.downgrade();
233 let issue_remote_buffers_request = cx
234 .spawn(async move |cx| {
235 let _ = maybe!(async move {
236 let response = request.await?;
237
238 for buffer_id in response.buffer_ids {
239 let buffer_id = BufferId::new(buffer_id)?;
240 let buffer = buffer_store
241 .update(cx, |buffer_store, cx| {
242 buffer_store.wait_for_remote_buffer(buffer_id, cx)
243 })?
244 .await?;
245 let _ = grab_buffer_snapshot_tx.send(buffer).await;
246 }
247
248 drop(guard);
249 anyhow::Ok(())
250 })
251 .await
252 .log_err();
253 })
254 .boxed_local();
255 (
256 FindSearchCandidates::Remote,
257 vec![issue_remote_buffers_request],
258 )
259 }
260 };
261
262 let matches_count = AtomicUsize::new(0);
263 let matched_buffer_count = AtomicUsize::new(0);
264
265 let worker_pool = executor.scoped(|scope| {
266 let num_cpus = executor.num_cpus();
267
268 assert!(num_cpus > 0);
269 for _ in 0..executor.num_cpus() - 1 {
270 let worker = Worker {
271 query: &query,
272 open_buffers: &open_buffers,
273 matched_buffer_count: &matched_buffer_count,
274 matches_count: &matches_count,
275 candidates: candidate_searcher.clone(),
276 find_all_matches_rx: find_all_matches_rx.clone(),
277 publish_matches: tx.clone(),
278 };
279 scope.spawn(worker.run());
280 }
281 drop(tx);
282 drop(find_all_matches_rx);
283 drop(candidate_searcher);
284 });
285
286 let buffer_snapshots = Self::grab_buffer_snapshots(
287 grab_buffer_snapshot_rx,
288 find_all_matches_tx,
289 cx.clone(),
290 );
291 futures::future::join_all(
292 [worker_pool.boxed_local(), buffer_snapshots.boxed_local()]
293 .into_iter()
294 .chain(tasks),
295 )
296 .await;
297 })
298 });
299
300 SearchResultsHandle {
301 results: rx,
302 matching_buffers,
303 trigger_search,
304 }
305 }
306
307 fn provide_search_paths(
308 worktrees: Vec<Entity<Worktree>>,
309 include_ignored: bool,
310 tx: Sender<InputPath>,
311 results: Sender<oneshot::Receiver<ProjectPath>>,
312 ) -> impl AsyncFnOnce(&mut AsyncApp) {
313 async move |cx| {
314 _ = maybe!(async move {
315 for worktree in worktrees {
316 let (mut snapshot, worktree_settings) = worktree
317 .read_with(cx, |this, _| {
318 Some((this.snapshot(), this.as_local()?.settings()))
319 })?
320 .context("The worktree is not local")?;
321 if include_ignored {
322 // Pre-fetch all of the ignored directories as they're going to be searched.
323 let mut entries_to_refresh = vec![];
324 for entry in snapshot.entries(include_ignored, 0) {
325 if entry.is_ignored && entry.kind.is_unloaded() {
326 if !worktree_settings.is_path_excluded(&entry.path) {
327 entries_to_refresh.push(entry.path.clone());
328 }
329 }
330 }
331 let barrier = worktree.update(cx, |this, _| {
332 let local = this.as_local_mut()?;
333 let barrier = entries_to_refresh
334 .into_iter()
335 .map(|path| local.add_path_prefix_to_scan(path).into_future())
336 .collect::<Vec<_>>();
337 Some(barrier)
338 })?;
339 if let Some(barriers) = barrier {
340 futures::future::join_all(barriers).await;
341 }
342 snapshot = worktree.read_with(cx, |this, _| this.snapshot())?;
343 }
344 cx.background_executor()
345 .scoped(|scope| {
346 scope.spawn(async {
347 for entry in snapshot.files(include_ignored, 0) {
348 let (should_scan_tx, should_scan_rx) = oneshot::channel();
349 let Ok(_) = tx
350 .send(InputPath {
351 entry: entry.clone(),
352 snapshot: snapshot.clone(),
353 should_scan_tx,
354 })
355 .await
356 else {
357 return;
358 };
359 if results.send(should_scan_rx).await.is_err() {
360 return;
361 };
362 }
363 })
364 })
365 .await;
366 }
367 anyhow::Ok(())
368 })
369 .await;
370 }
371 }
372
373 async fn maintain_sorted_search_results(
374 rx: Receiver<oneshot::Receiver<ProjectPath>>,
375 paths_for_full_scan: Sender<ProjectPath>,
376 limit: usize,
377 ) {
378 let mut rx = pin!(rx);
379 let mut matched = 0;
380 while let Some(mut next_path_result) = rx.next().await {
381 let Some(successful_path) = next_path_result.next().await else {
382 // This math did not produce a match, hence skip it.
383 continue;
384 };
385 if paths_for_full_scan.send(successful_path).await.is_err() {
386 return;
387 };
388 matched += 1;
389 if matched >= limit {
390 break;
391 }
392 }
393 }
394
395 /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
396 async fn open_buffers(
397 buffer_store: &Entity<BufferStore>,
398 rx: Receiver<ProjectPath>,
399 find_all_matches_tx: Sender<Entity<Buffer>>,
400 mut cx: AsyncApp,
401 ) {
402 let mut rx = pin!(rx.ready_chunks(64));
403 _ = maybe!(async move {
404 while let Some(requested_paths) = rx.next().await {
405 let mut buffers = buffer_store.update(&mut cx, |this, cx| {
406 requested_paths
407 .into_iter()
408 .map(|path| this.open_buffer(path, cx))
409 .collect::<FuturesOrdered<_>>()
410 })?;
411
412 while let Some(buffer) = buffers.next().await {
413 if let Some(buffer) = buffer.log_err() {
414 find_all_matches_tx.send(buffer).await?;
415 }
416 }
417 }
418 Result::<_, anyhow::Error>::Ok(())
419 })
420 .await;
421 }
422
423 async fn grab_buffer_snapshots(
424 rx: Receiver<Entity<Buffer>>,
425 find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
426 mut cx: AsyncApp,
427 ) {
428 _ = maybe!(async move {
429 while let Ok(buffer) = rx.recv().await {
430 let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
431 find_all_matches_tx.send((buffer, snapshot)).await?;
432 }
433 Result::<_, anyhow::Error>::Ok(())
434 })
435 .await;
436 }
437}
438
439struct Worker<'search> {
440 query: &'search SearchQuery,
441 matched_buffer_count: &'search AtomicUsize,
442 matches_count: &'search AtomicUsize,
443 open_buffers: &'search HashSet<ProjectEntryId>,
444 candidates: FindSearchCandidates,
445 /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
446 find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
447 /// Cool, we have results; let's share them with the world.
448 publish_matches: Sender<SearchResult>,
449}
450
451impl Worker<'_> {
452 async fn run(mut self) {
453 let (
454 input_paths_rx,
455 confirm_contents_will_match_rx,
456 mut confirm_contents_will_match_tx,
457 mut get_buffer_for_full_scan_tx,
458 fs,
459 ) = match self.candidates {
460 FindSearchCandidates::Local {
461 fs,
462 input_paths_rx,
463 confirm_contents_will_match_rx,
464 confirm_contents_will_match_tx,
465 get_buffer_for_full_scan_tx,
466 } => (
467 input_paths_rx,
468 confirm_contents_will_match_rx,
469 confirm_contents_will_match_tx,
470 get_buffer_for_full_scan_tx,
471 Some(fs),
472 ),
473 FindSearchCandidates::Remote => (
474 unbounded().1,
475 unbounded().1,
476 unbounded().0,
477 unbounded().0,
478 None,
479 ),
480 };
481 let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
482 let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse());
483 let mut scan_path = pin!(input_paths_rx.fuse());
484
485 loop {
486 let handler = RequestHandler {
487 query: self.query,
488 open_entries: &self.open_buffers,
489 fs: fs.as_deref(),
490 matched_buffer_count: self.matched_buffer_count,
491 matches_count: self.matches_count,
492 confirm_contents_will_match_tx: &confirm_contents_will_match_tx,
493 get_buffer_for_full_scan_tx: &get_buffer_for_full_scan_tx,
494 publish_matches: &self.publish_matches,
495 };
496 // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
497 // steps straight away. Another worker might be about to produce a value that will
498 // be pushed there, thus we'll replace current worker's pipe with a dummy one.
499 // That way, we'll only ever close a next-stage channel when ALL workers do so.
500 select_biased! {
501 find_all_matches = find_all_matches.next() => {
502
503 if self.publish_matches.is_closed() {
504 break;
505 }
506 let Some(matches) = find_all_matches else {
507 self.publish_matches = bounded(1).0;
508 continue;
509 };
510 let result = handler.handle_find_all_matches(matches).await;
511 if let Some(_should_bail) = result {
512
513 self.publish_matches = bounded(1).0;
514 continue;
515 }
516 },
517 find_first_match = find_first_match.next() => {
518 if let Some(buffer_with_at_least_one_match) = find_first_match {
519 handler.handle_find_first_match(buffer_with_at_least_one_match).await;
520 } else {
521 get_buffer_for_full_scan_tx = bounded(1).0;
522 }
523
524 },
525 scan_path = scan_path.next() => {
526 if let Some(path_to_scan) = scan_path {
527 handler.handle_scan_path(path_to_scan).await;
528 } else {
529 // If we're the last worker to notice that this is not producing values, close the upstream.
530 confirm_contents_will_match_tx = bounded(1).0;
531 }
532
533 }
534 complete => {
535 break
536 },
537
538 }
539 }
540 }
541}
542
543struct RequestHandler<'worker> {
544 query: &'worker SearchQuery,
545 fs: Option<&'worker dyn Fs>,
546 open_entries: &'worker HashSet<ProjectEntryId>,
547 matched_buffer_count: &'worker AtomicUsize,
548 matches_count: &'worker AtomicUsize,
549
550 confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
551 get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
552 publish_matches: &'worker Sender<SearchResult>,
553}
554
555struct LimitReached;
556
557impl RequestHandler<'_> {
558 async fn handle_find_all_matches(
559 &self,
560 (buffer, snapshot): (Entity<Buffer>, BufferSnapshot),
561 ) -> Option<LimitReached> {
562 let ranges = self
563 .query
564 .search(&snapshot, None)
565 .await
566 .iter()
567 .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
568 .collect::<Vec<_>>();
569
570 let matched_ranges = ranges.len();
571 if self.matched_buffer_count.fetch_add(1, Ordering::Release)
572 > Search::MAX_SEARCH_RESULT_FILES
573 || self
574 .matches_count
575 .fetch_add(matched_ranges, Ordering::Release)
576 > Search::MAX_SEARCH_RESULT_RANGES
577 {
578 _ = self.publish_matches.send(SearchResult::LimitReached).await;
579 Some(LimitReached)
580 } else {
581 _ = self
582 .publish_matches
583 .send(SearchResult::Buffer { buffer, ranges })
584 .await;
585 None
586 }
587 }
588 async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
589 _=maybe!(async move {
590 let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
591 let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else {
592 return anyhow::Ok(());
593 };
594
595 let mut file = BufReader::new(file);
596 let file_start = file.fill_buf()?;
597
598 if let Err(Some(starting_position)) =
599 std::str::from_utf8(file_start).map_err(|e| e.error_len())
600 {
601 // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
602 // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
603 log::debug!(
604 "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
605 );
606 return Ok(());
607 }
608
609 if self.query.detect(file).unwrap_or(false) {
610 // Yes, we should scan the whole file.
611 entry.should_scan_tx.send(entry.path).await?;
612 }
613 Ok(())
614 }).await;
615 }
616
617 async fn handle_scan_path(&self, req: InputPath) {
618 _ = maybe!(async move {
619 let InputPath {
620 entry,
621
622 snapshot,
623 should_scan_tx,
624 } = req;
625
626 if entry.is_fifo || !entry.is_file() {
627 return Ok(());
628 }
629
630 if self.query.filters_path() {
631 let matched_path = if self.query.match_full_paths() {
632 let mut full_path = snapshot.root_name().as_std_path().to_owned();
633 full_path.push(entry.path.as_std_path());
634 self.query.match_path(&full_path)
635 } else {
636 self.query.match_path(entry.path.as_std_path())
637 };
638 if !matched_path {
639 return Ok(());
640 }
641 }
642
643 if self.open_entries.contains(&entry.id) {
644 // The buffer is already in memory and that's the version we want to scan;
645 // hence skip the dilly-dally and look for all matches straight away.
646 self.get_buffer_for_full_scan_tx
647 .send(ProjectPath {
648 worktree_id: snapshot.id(),
649 path: entry.path.clone(),
650 })
651 .await?;
652 } else {
653 self.confirm_contents_will_match_tx
654 .send(MatchingEntry {
655 should_scan_tx: should_scan_tx,
656 worktree_root: snapshot.abs_path().clone(),
657 path: ProjectPath {
658 worktree_id: snapshot.id(),
659 path: entry.path.clone(),
660 },
661 })
662 .await?;
663 }
664
665 anyhow::Ok(())
666 })
667 .await;
668 }
669}
670
671struct InputPath {
672 entry: Entry,
673 snapshot: Snapshot,
674 should_scan_tx: oneshot::Sender<ProjectPath>,
675}
676
677struct MatchingEntry {
678 worktree_root: Arc<Path>,
679 path: ProjectPath,
680 should_scan_tx: oneshot::Sender<ProjectPath>,
681}