1use std::{
2 io::{BufRead, BufReader},
3 path::Path,
4 pin::pin,
5 sync::{
6 Arc,
7 atomic::{AtomicUsize, Ordering},
8 },
9};
10
11use anyhow::Context;
12use collections::HashSet;
13use fs::Fs;
14use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
15use gpui::{App, AppContext, AsyncApp, Entity, Task};
16use language::{Buffer, BufferSnapshot};
17use parking_lot::Mutex;
18use postage::oneshot;
19use rpc::{AnyProtoClient, proto};
20use smol::{
21 channel::{Receiver, Sender, bounded, unbounded},
22 future::FutureExt,
23};
24
25use text::BufferId;
26use util::{ResultExt, maybe};
27use worktree::{Entry, ProjectEntryId, Snapshot, Worktree};
28
29use crate::{
30 Project, ProjectItem, ProjectPath, RemotelyCreatedModels,
31 buffer_store::BufferStore,
32 search::{SearchQuery, SearchResult},
33 worktree_store::WorktreeStore,
34};
35
36pub(crate) struct Search {
37 pub(crate) fs: Arc<dyn Fs>,
38 pub(crate) buffer_store: Entity<BufferStore>,
39 pub(crate) worktree_store: Entity<WorktreeStore>,
40 pub(crate) worktrees: Vec<Entity<Worktree>>,
41 pub(crate) limit: usize,
42 pub(crate) client: Option<(AnyProtoClient, u64)>,
43 pub(crate) remotely_created_models: Arc<Mutex<RemotelyCreatedModels>>,
44}
45
46/// Represents results of project search and allows one to either obtain match positions OR
47/// just the handles to buffers that may match the search.
48#[must_use]
49pub(crate) struct SearchResultsHandle {
50 results: Receiver<SearchResult>,
51 matching_buffers: Receiver<Entity<Buffer>>,
52 trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
53}
54
55impl SearchResultsHandle {
56 pub(crate) fn results(self, cx: &mut App) -> Receiver<SearchResult> {
57 (self.trigger_search)(cx).detach();
58 self.results
59 }
60 pub(crate) fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
61 (self.trigger_search)(cx).detach();
62 self.matching_buffers
63 }
64}
65
66#[derive(Clone)]
67enum FindSearchCandidates {
68 Local {
69 /// Start off with all paths in project and filter them based on:
70 /// - Include filters
71 /// - Exclude filters
72 /// - Only open buffers
73 /// - Scan ignored files
74 /// Put another way: filter out files that can't match (without looking at file contents)
75 input_paths_rx: Receiver<InputPath>,
76 /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
77 /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
78 confirm_contents_will_match_tx: Sender<MatchingEntry>,
79 confirm_contents_will_match_rx: Receiver<MatchingEntry>,
80 /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
81 /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
82 get_buffer_for_full_scan_tx: Sender<ProjectPath>,
83 },
84 Remote,
85}
86
87impl Search {
88 pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000;
89 pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
90 /// Prepares a project search run. The result has to be used to specify whether you're interested in matching buffers
91 /// or full search results.
92 pub(crate) fn into_results(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
93 let mut open_buffers = HashSet::default();
94 let mut unnamed_buffers = Vec::new();
95 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
96 let buffers = self.buffer_store.read(cx);
97 for handle in buffers.buffers() {
98 let buffer = handle.read(cx);
99 if !buffers.is_searchable(&buffer.remote_id()) {
100 continue;
101 } else if let Some(entry_id) = buffer.entry_id(cx) {
102 open_buffers.insert(entry_id);
103 } else {
104 self.limit -= self.limit.saturating_sub(1);
105 unnamed_buffers.push(handle)
106 };
107 }
108 let executor = cx.background_executor().clone();
109 let (tx, rx) = unbounded();
110 let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
111 let matching_buffers = grab_buffer_snapshot_rx.clone();
112 let trigger_search = Box::new(|cx: &mut App| {
113 cx.spawn(async move |cx| {
114 for buffer in unnamed_buffers {
115 _ = grab_buffer_snapshot_tx.send(buffer).await;
116 }
117
118 let (find_all_matches_tx, find_all_matches_rx) =
119 bounded(MAX_CONCURRENT_BUFFER_OPENS);
120
121 let (candidate_searcher, tasks) = if let Some((client, remote_id)) = self.client {
122 let request = client.request(proto::FindSearchCandidates {
123 project_id: remote_id,
124 query: Some(query.to_proto()),
125 limit: self.limit as _,
126 });
127 let Ok(guard) = cx.update(|cx| {
128 Project::retain_remotely_created_models_impl(
129 &self.remotely_created_models,
130 &self.buffer_store,
131 &self.worktree_store,
132 cx,
133 )
134 }) else {
135 return;
136 };
137 let buffer_store = self.buffer_store.downgrade();
138 let issue_remote_buffers_request = cx
139 .spawn(async move |cx| {
140 let _ = maybe!(async move {
141 let response = request.await?;
142
143 for buffer_id in response.buffer_ids {
144 let buffer_id = BufferId::new(buffer_id)?;
145 let buffer = buffer_store
146 .update(cx, |buffer_store, cx| {
147 buffer_store.wait_for_remote_buffer(buffer_id, cx)
148 })?
149 .await?;
150 let _ = grab_buffer_snapshot_tx.send(buffer).await;
151 }
152
153 drop(guard);
154 anyhow::Ok(())
155 })
156 .await
157 .log_err();
158 })
159 .boxed_local();
160 (
161 FindSearchCandidates::Remote,
162 vec![issue_remote_buffers_request],
163 )
164 } else {
165 let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) = unbounded();
166 let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
167 bounded(64);
168 let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
169
170 let (input_paths_tx, input_paths_rx) = unbounded();
171 let tasks = vec![
172 cx.spawn(Self::provide_search_paths(
173 std::mem::take(&mut self.worktrees),
174 query.include_ignored(),
175 input_paths_tx,
176 sorted_search_results_tx,
177 ))
178 .boxed_local(),
179 self.open_buffers(
180 get_buffer_for_full_scan_rx,
181 grab_buffer_snapshot_tx,
182 cx.clone(),
183 )
184 .boxed_local(),
185 cx.background_spawn(Self::maintain_sorted_search_results(
186 sorted_search_results_rx,
187 get_buffer_for_full_scan_tx.clone(),
188 self.limit,
189 ))
190 .boxed_local(),
191 ];
192 (
193 FindSearchCandidates::Local {
194 get_buffer_for_full_scan_tx,
195 confirm_contents_will_match_tx,
196 confirm_contents_will_match_rx,
197 input_paths_rx,
198 },
199 tasks,
200 )
201 };
202
203 let matches_count = AtomicUsize::new(0);
204 let matched_buffer_count = AtomicUsize::new(0);
205
206 let worker_pool = executor.scoped(|scope| {
207 let num_cpus = executor.num_cpus();
208
209 assert!(num_cpus > 0);
210 for _ in 0..executor.num_cpus() - 1 {
211 let worker = Worker {
212 query: &query,
213 open_buffers: &open_buffers,
214 matched_buffer_count: &matched_buffer_count,
215 matches_count: &matches_count,
216 fs: &*self.fs,
217 candidates: candidate_searcher.clone(),
218 find_all_matches_rx: find_all_matches_rx.clone(),
219 publish_matches: tx.clone(),
220 };
221 scope.spawn(worker.run());
222 }
223 drop(tx);
224 drop(find_all_matches_rx);
225 drop(candidate_searcher);
226 });
227
228 let buffer_snapshots = Self::grab_buffer_snapshots(
229 grab_buffer_snapshot_rx,
230 find_all_matches_tx,
231 cx.clone(),
232 );
233 futures::future::join_all(
234 [worker_pool.boxed_local(), buffer_snapshots.boxed_local()]
235 .into_iter()
236 .chain(tasks),
237 )
238 .await;
239 })
240 });
241
242 SearchResultsHandle {
243 results: rx,
244 matching_buffers,
245 trigger_search,
246 }
247 }
248
249 fn provide_search_paths(
250 worktrees: Vec<Entity<Worktree>>,
251 include_ignored: bool,
252 tx: Sender<InputPath>,
253 results: Sender<oneshot::Receiver<ProjectPath>>,
254 ) -> impl AsyncFnOnce(&mut AsyncApp) {
255 async move |cx| {
256 _ = maybe!(async move {
257 for worktree in worktrees {
258 let (mut snapshot, worktree_settings) = worktree
259 .read_with(cx, |this, _| {
260 Some((this.snapshot(), this.as_local()?.settings()))
261 })?
262 .context("The worktree is not local")?;
263 if include_ignored {
264 // Pre-fetch all of the ignored directories as they're going to be searched.
265 let mut entries_to_refresh = vec![];
266 for entry in snapshot.entries(include_ignored, 0) {
267 if entry.is_ignored && entry.kind.is_unloaded() {
268 if !worktree_settings.is_path_excluded(&entry.path) {
269 entries_to_refresh.push(entry.path.clone());
270 }
271 }
272 }
273 let barrier = worktree.update(cx, |this, _| {
274 let local = this.as_local_mut()?;
275 let barrier = entries_to_refresh
276 .into_iter()
277 .map(|path| local.add_path_prefix_to_scan(path).into_future())
278 .collect::<Vec<_>>();
279 Some(barrier)
280 })?;
281 if let Some(barriers) = barrier {
282 futures::future::join_all(barriers).await;
283 }
284 snapshot = worktree.read_with(cx, |this, _| this.snapshot())?;
285 }
286 cx.background_executor()
287 .scoped(|scope| {
288 scope.spawn(async {
289 for entry in snapshot.files(include_ignored, 0) {
290 let (should_scan_tx, should_scan_rx) = oneshot::channel();
291 let Ok(_) = tx
292 .send(InputPath {
293 entry: entry.clone(),
294 snapshot: snapshot.clone(),
295 should_scan_tx,
296 })
297 .await
298 else {
299 return;
300 };
301 if results.send(should_scan_rx).await.is_err() {
302 return;
303 };
304 }
305 })
306 })
307 .await;
308 }
309 anyhow::Ok(())
310 })
311 .await;
312 }
313 }
314
315 async fn maintain_sorted_search_results(
316 rx: Receiver<oneshot::Receiver<ProjectPath>>,
317 paths_for_full_scan: Sender<ProjectPath>,
318 limit: usize,
319 ) {
320 let mut rx = pin!(rx);
321 let mut matched = 0;
322 while let Some(mut next_path_result) = rx.next().await {
323 let Some(successful_path) = next_path_result.next().await else {
324 // This math did not produce a match, hence skip it.
325 continue;
326 };
327 if paths_for_full_scan.send(successful_path).await.is_err() {
328 return;
329 };
330 matched += 1;
331 if matched >= limit {
332 break;
333 }
334 }
335 }
336
337 /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
338 async fn open_buffers(
339 &self,
340 rx: Receiver<ProjectPath>,
341 find_all_matches_tx: Sender<Entity<Buffer>>,
342 mut cx: AsyncApp,
343 ) {
344 let mut rx = pin!(rx.ready_chunks(64));
345 _ = maybe!(async move {
346 while let Some(requested_paths) = rx.next().await {
347 let mut buffers = self.buffer_store.update(&mut cx, |this, cx| {
348 requested_paths
349 .into_iter()
350 .map(|path| this.open_buffer(path, cx))
351 .collect::<FuturesOrdered<_>>()
352 })?;
353
354 while let Some(buffer) = buffers.next().await {
355 if let Some(buffer) = buffer.log_err() {
356 find_all_matches_tx.send(buffer).await?;
357 }
358 }
359 }
360 Result::<_, anyhow::Error>::Ok(())
361 })
362 .await;
363 }
364
365 async fn grab_buffer_snapshots(
366 rx: Receiver<Entity<Buffer>>,
367 find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
368 mut cx: AsyncApp,
369 ) {
370 _ = maybe!(async move {
371 while let Ok(buffer) = rx.recv().await {
372 let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
373 find_all_matches_tx.send((buffer, snapshot)).await?;
374 }
375 Result::<_, anyhow::Error>::Ok(())
376 })
377 .await;
378 }
379}
380
381struct Worker<'search> {
382 query: &'search SearchQuery,
383 matched_buffer_count: &'search AtomicUsize,
384 matches_count: &'search AtomicUsize,
385 open_buffers: &'search HashSet<ProjectEntryId>,
386 fs: &'search dyn Fs,
387
388 candidates: FindSearchCandidates,
389 /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
390 find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
391 /// Cool, we have results; let's share them with the world.
392 publish_matches: Sender<SearchResult>,
393}
394
395impl Worker<'_> {
396 async fn run(mut self) {
397 let (
398 input_paths_rx,
399 confirm_contents_will_match_rx,
400 mut confirm_contents_will_match_tx,
401 mut get_buffer_for_full_scan_tx,
402 ) = match self.candidates {
403 FindSearchCandidates::Local {
404 input_paths_rx,
405 confirm_contents_will_match_rx,
406 confirm_contents_will_match_tx,
407 get_buffer_for_full_scan_tx,
408 } => (
409 input_paths_rx,
410 confirm_contents_will_match_rx,
411 confirm_contents_will_match_tx,
412 get_buffer_for_full_scan_tx,
413 ),
414 FindSearchCandidates::Remote => {
415 (unbounded().1, unbounded().1, unbounded().0, unbounded().0)
416 }
417 };
418 let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
419 let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse());
420 let mut scan_path = pin!(input_paths_rx.fuse());
421
422 loop {
423 let handler = RequestHandler {
424 query: self.query,
425 open_entries: &self.open_buffers,
426 fs: self.fs,
427 matched_buffer_count: self.matched_buffer_count,
428 matches_count: self.matches_count,
429 confirm_contents_will_match_tx: &confirm_contents_will_match_tx,
430 get_buffer_for_full_scan_tx: &get_buffer_for_full_scan_tx,
431 publish_matches: &self.publish_matches,
432 };
433 // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
434 // steps straight away. Another worker might be about to produce a value that will
435 // be pushed there, thus we'll replace current worker's pipe with a dummy one.
436 // That way, we'll only ever close a next-stage channel when ALL workers do so.
437 select_biased! {
438 find_all_matches = find_all_matches.next() => {
439
440 if self.publish_matches.is_closed() {
441 break;
442 }
443 let Some(matches) = find_all_matches else {
444 self.publish_matches = bounded(1).0;
445 continue;
446 };
447 let result = handler.handle_find_all_matches(matches).await;
448 if let Some(_should_bail) = result {
449
450 self.publish_matches = bounded(1).0;
451 continue;
452 }
453 },
454 find_first_match = find_first_match.next() => {
455 if let Some(buffer_with_at_least_one_match) = find_first_match {
456 handler.handle_find_first_match(buffer_with_at_least_one_match).await;
457 } else {
458 get_buffer_for_full_scan_tx = bounded(1).0;
459 }
460
461 },
462 scan_path = scan_path.next() => {
463 if let Some(path_to_scan) = scan_path {
464 handler.handle_scan_path(path_to_scan).await;
465 } else {
466 // If we're the last worker to notice that this is not producing values, close the upstream.
467 confirm_contents_will_match_tx = bounded(1).0;
468 }
469
470 }
471 complete => {
472 break
473 },
474
475 }
476 }
477 }
478}
479
480struct RequestHandler<'worker> {
481 query: &'worker SearchQuery,
482 fs: &'worker dyn Fs,
483 open_entries: &'worker HashSet<ProjectEntryId>,
484 matched_buffer_count: &'worker AtomicUsize,
485 matches_count: &'worker AtomicUsize,
486
487 confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
488 get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
489 publish_matches: &'worker Sender<SearchResult>,
490}
491
492struct LimitReached;
493
494impl RequestHandler<'_> {
495 async fn handle_find_all_matches(
496 &self,
497 (buffer, snapshot): (Entity<Buffer>, BufferSnapshot),
498 ) -> Option<LimitReached> {
499 let ranges = self
500 .query
501 .search(&snapshot, None)
502 .await
503 .iter()
504 .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
505 .collect::<Vec<_>>();
506
507 let matched_ranges = ranges.len();
508 if self.matched_buffer_count.fetch_add(1, Ordering::Release)
509 > Search::MAX_SEARCH_RESULT_FILES
510 || self
511 .matches_count
512 .fetch_add(matched_ranges, Ordering::Release)
513 > Search::MAX_SEARCH_RESULT_RANGES
514 {
515 _ = self.publish_matches.send(SearchResult::LimitReached).await;
516 Some(LimitReached)
517 } else {
518 _ = self
519 .publish_matches
520 .send(SearchResult::Buffer { buffer, ranges })
521 .await;
522 None
523 }
524 }
525 async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
526 _=maybe!(async move {
527 let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
528 let Some(file) = self.fs.open_sync(&abs_path).await.log_err() else {
529 return anyhow::Ok(());
530 };
531
532 let mut file = BufReader::new(file);
533 let file_start = file.fill_buf()?;
534
535 if let Err(Some(starting_position)) =
536 std::str::from_utf8(file_start).map_err(|e| e.error_len())
537 {
538 // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
539 // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
540 log::debug!(
541 "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
542 );
543 return Ok(());
544 }
545
546 if self.query.detect(file).unwrap_or(false) {
547 // Yes, we should scan the whole file.
548 entry.should_scan_tx.send(entry.path).await?;
549 }
550 Ok(())
551 }).await;
552 }
553
554 async fn handle_scan_path(&self, req: InputPath) {
555 _ = maybe!(async move {
556 let InputPath {
557 entry,
558
559 snapshot,
560 should_scan_tx,
561 } = req;
562
563 if entry.is_fifo || !entry.is_file() {
564 return Ok(());
565 }
566
567 if self.query.filters_path() {
568 let matched_path = if self.query.match_full_paths() {
569 let mut full_path = snapshot.root_name().as_std_path().to_owned();
570 full_path.push(entry.path.as_std_path());
571 self.query.match_path(&full_path)
572 } else {
573 self.query.match_path(entry.path.as_std_path())
574 };
575 if !matched_path {
576 return Ok(());
577 }
578 }
579
580 if self.open_entries.contains(&entry.id) {
581 // The buffer is already in memory and that's the version we want to scan;
582 // hence skip the dilly-dally and look for all matches straight away.
583 self.get_buffer_for_full_scan_tx
584 .send(ProjectPath {
585 worktree_id: snapshot.id(),
586 path: entry.path.clone(),
587 })
588 .await?;
589 } else {
590 self.confirm_contents_will_match_tx
591 .send(MatchingEntry {
592 should_scan_tx: should_scan_tx,
593 worktree_root: snapshot.abs_path().clone(),
594 path: ProjectPath {
595 worktree_id: snapshot.id(),
596 path: entry.path.clone(),
597 },
598 })
599 .await?;
600 }
601
602 anyhow::Ok(())
603 })
604 .await;
605 }
606}
607
608struct InputPath {
609 entry: Entry,
610 snapshot: Snapshot,
611 should_scan_tx: oneshot::Sender<ProjectPath>,
612}
613
614struct MatchingEntry {
615 worktree_root: Arc<Path>,
616 path: ProjectPath,
617 should_scan_tx: oneshot::Sender<ProjectPath>,
618}