1use std::{
2 io::{BufRead, BufReader},
3 path::Path,
4 pin::pin,
5 sync::{
6 Arc,
7 atomic::{AtomicUsize, Ordering},
8 },
9};
10
11use anyhow::Context;
12use collections::HashSet;
13use fs::Fs;
14use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
15use gpui::{App, AsyncApp, Entity, Task};
16use language::{Buffer, BufferSnapshot};
17use postage::oneshot;
18use smol::channel::{Receiver, Sender, bounded, unbounded};
19
20use util::{ResultExt, maybe};
21use worktree::{Entry, ProjectEntryId, Snapshot, Worktree};
22
23use crate::{
24 ProjectItem, ProjectPath,
25 buffer_store::BufferStore,
26 search::{SearchQuery, SearchResult},
27};
28
29pub(crate) struct Search {
30 pub(crate) fs: Arc<dyn Fs>,
31 pub(crate) buffer_store: Entity<BufferStore>,
32 pub(crate) worktrees: Vec<Entity<Worktree>>,
33 pub(crate) limit: usize,
34}
35
36/// Represents results of project search and allows one to either obtain match positions OR
37/// just the handles to buffers that may match the search.
38#[must_use]
39pub(crate) struct SearchResultsHandle {
40 results: Receiver<SearchResult>,
41 matching_buffers: Receiver<Entity<Buffer>>,
42 trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
43}
44
45impl SearchResultsHandle {
46 pub(crate) fn results(self, cx: &mut App) -> Receiver<SearchResult> {
47 (self.trigger_search)(cx).detach();
48 self.results
49 }
50 pub(crate) fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
51 (self.trigger_search)(cx).detach();
52 self.matching_buffers
53 }
54}
55
56impl Search {
57 pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000;
58 pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
59 /// Prepares a project search run. The result has to be used to specify whether you're interested in matching buffers
60 /// or full search results.
61 pub(crate) fn into_results(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
62 let mut open_buffers = HashSet::default();
63 let mut unnamed_buffers = Vec::new();
64 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
65 let buffers = self.buffer_store.read(cx);
66 for handle in buffers.buffers() {
67 let buffer = handle.read(cx);
68 if !buffers.is_searchable(&buffer.remote_id()) {
69 continue;
70 } else if let Some(entry_id) = buffer.entry_id(cx) {
71 open_buffers.insert(entry_id);
72 } else {
73 self.limit -= self.limit.saturating_sub(1);
74 unnamed_buffers.push(handle)
75 };
76 }
77 let executor = cx.background_executor().clone();
78 let (tx, rx) = unbounded();
79 let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
80 let matching_buffers = grab_buffer_snapshot_rx.clone();
81 let trigger_search = Box::new(|cx: &mut App| {
82 cx.spawn(async move |cx| {
83 for buffer in unnamed_buffers {
84 _ = grab_buffer_snapshot_tx.send(buffer).await;
85 }
86
87 let (find_all_matches_tx, find_all_matches_rx) =
88 bounded(MAX_CONCURRENT_BUFFER_OPENS);
89
90 let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) = unbounded();
91 let matches_count = AtomicUsize::new(0);
92 let matched_buffer_count = AtomicUsize::new(0);
93 let (input_paths_tx, input_paths_rx) = unbounded();
94 let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
95 let worker_pool = executor.scoped(|scope| {
96 let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
97 bounded(64);
98
99 let num_cpus = executor.num_cpus();
100
101 assert!(num_cpus > 0);
102 for _ in 0..executor.num_cpus() - 1 {
103 let worker = Worker {
104 query: &query,
105 open_buffers: &open_buffers,
106 matched_buffer_count: &matched_buffer_count,
107 matches_count: &matches_count,
108 fs: &*self.fs,
109 input_paths_rx: input_paths_rx.clone(),
110 confirm_contents_will_match_rx: confirm_contents_will_match_rx.clone(),
111 confirm_contents_will_match_tx: confirm_contents_will_match_tx.clone(),
112 get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(),
113 find_all_matches_rx: find_all_matches_rx.clone(),
114 publish_matches: tx.clone(),
115 };
116 scope.spawn(worker.run());
117 }
118 drop(tx);
119 drop(find_all_matches_rx);
120
121 scope.spawn(Self::maintain_sorted_search_results(
122 sorted_search_results_rx,
123 get_buffer_for_full_scan_tx,
124 self.limit,
125 ))
126 });
127 let provide_search_paths = cx.spawn(Self::provide_search_paths(
128 std::mem::take(&mut self.worktrees),
129 query.include_ignored(),
130 input_paths_tx,
131 sorted_search_results_tx,
132 ));
133 let open_buffers = self.open_buffers(
134 get_buffer_for_full_scan_rx,
135 grab_buffer_snapshot_tx,
136 cx.clone(),
137 );
138 let buffer_snapshots = self.grab_buffer_snapshots(
139 grab_buffer_snapshot_rx,
140 find_all_matches_tx,
141 cx.clone(),
142 );
143 futures::future::join4(
144 worker_pool,
145 buffer_snapshots,
146 open_buffers,
147 provide_search_paths,
148 )
149 .await;
150 })
151 });
152 SearchResultsHandle {
153 results: rx,
154 matching_buffers,
155 trigger_search,
156 }
157 }
158
159 fn provide_search_paths(
160 worktrees: Vec<Entity<Worktree>>,
161 include_ignored: bool,
162 tx: Sender<InputPath>,
163 results: Sender<oneshot::Receiver<ProjectPath>>,
164 ) -> impl AsyncFnOnce(&mut AsyncApp) {
165 async move |cx| {
166 _ = maybe!(async move {
167 for worktree in worktrees {
168 let (mut snapshot, worktree_settings) = worktree
169 .read_with(cx, |this, _| {
170 Some((this.snapshot(), this.as_local()?.settings()))
171 })?
172 .context("The worktree is not local")?;
173 if include_ignored {
174 // Pre-fetch all of the ignored directories as they're going to be searched.
175 let mut entries_to_refresh = vec![];
176 for entry in snapshot.entries(include_ignored, 0) {
177 if entry.is_ignored && entry.kind.is_unloaded() {
178 if !worktree_settings.is_path_excluded(&entry.path) {
179 entries_to_refresh.push(entry.path.clone());
180 }
181 }
182 }
183 let barrier = worktree.update(cx, |this, _| {
184 let local = this.as_local_mut()?;
185 let barrier = entries_to_refresh
186 .into_iter()
187 .map(|path| local.add_path_prefix_to_scan(path).into_future())
188 .collect::<Vec<_>>();
189 Some(barrier)
190 })?;
191 if let Some(barriers) = barrier {
192 futures::future::join_all(barriers).await;
193 }
194 snapshot = worktree.read_with(cx, |this, _| this.snapshot())?;
195 }
196 cx.background_executor()
197 .scoped(|scope| {
198 scope.spawn(async {
199 for entry in snapshot.files(include_ignored, 0) {
200 let (should_scan_tx, should_scan_rx) = oneshot::channel();
201 let Ok(_) = tx
202 .send(InputPath {
203 entry: entry.clone(),
204 snapshot: snapshot.clone(),
205 should_scan_tx,
206 })
207 .await
208 else {
209 return;
210 };
211 if results.send(should_scan_rx).await.is_err() {
212 return;
213 };
214 }
215 })
216 })
217 .await;
218 }
219 anyhow::Ok(())
220 })
221 .await;
222 }
223 }
224
225 async fn maintain_sorted_search_results(
226 rx: Receiver<oneshot::Receiver<ProjectPath>>,
227 paths_for_full_scan: Sender<ProjectPath>,
228 limit: usize,
229 ) {
230 let mut rx = pin!(rx);
231 let mut matched = 0;
232 while let Some(mut next_path_result) = rx.next().await {
233 let Some(successful_path) = next_path_result.next().await else {
234 // This math did not produce a match, hence skip it.
235 continue;
236 };
237 if paths_for_full_scan.send(successful_path).await.is_err() {
238 return;
239 };
240 matched += 1;
241 if matched >= limit {
242 break;
243 }
244 }
245 }
246
247 /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
248 async fn open_buffers(
249 &self,
250 rx: Receiver<ProjectPath>,
251 find_all_matches_tx: Sender<Entity<Buffer>>,
252 mut cx: AsyncApp,
253 ) {
254 let mut rx = pin!(rx.ready_chunks(64));
255 _ = maybe!(async move {
256 while let Some(requested_paths) = rx.next().await {
257 let mut buffers = self.buffer_store.update(&mut cx, |this, cx| {
258 requested_paths
259 .into_iter()
260 .map(|path| this.open_buffer(path, cx))
261 .collect::<FuturesOrdered<_>>()
262 })?;
263
264 while let Some(buffer) = buffers.next().await {
265 if let Some(buffer) = buffer.log_err() {
266 find_all_matches_tx.send(buffer).await?;
267 }
268 }
269 }
270 Result::<_, anyhow::Error>::Ok(())
271 })
272 .await;
273 }
274
275 async fn grab_buffer_snapshots(
276 &self,
277 rx: Receiver<Entity<Buffer>>,
278 find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
279 mut cx: AsyncApp,
280 ) {
281 _ = maybe!(async move {
282 while let Ok(buffer) = rx.recv().await {
283 let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
284 find_all_matches_tx.send((buffer, snapshot)).await?;
285 }
286 Result::<_, anyhow::Error>::Ok(())
287 })
288 .await;
289 }
290}
291
292struct Worker<'search> {
293 query: &'search SearchQuery,
294 matched_buffer_count: &'search AtomicUsize,
295 matches_count: &'search AtomicUsize,
296 open_buffers: &'search HashSet<ProjectEntryId>,
297 fs: &'search dyn Fs,
298 /// Start off with all paths in project and filter them based on:
299 /// - Include filters
300 /// - Exclude filters
301 /// - Only open buffers
302 /// - Scan ignored files
303 /// Put another way: filter out files that can't match (without looking at file contents)
304 input_paths_rx: Receiver<InputPath>,
305
306 /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
307 /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
308 confirm_contents_will_match_tx: Sender<MatchingEntry>,
309 confirm_contents_will_match_rx: Receiver<MatchingEntry>,
310 /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
311 /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
312 get_buffer_for_full_scan_tx: Sender<ProjectPath>,
313 /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
314 find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
315 /// Cool, we have results; let's share them with the world.
316 publish_matches: Sender<SearchResult>,
317}
318
319impl Worker<'_> {
320 async fn run(mut self) {
321 let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
322 let mut find_first_match = pin!(self.confirm_contents_will_match_rx.fuse());
323 let mut scan_path = pin!(self.input_paths_rx.fuse());
324
325 loop {
326 let handler = RequestHandler {
327 query: self.query,
328 open_entries: &self.open_buffers,
329 fs: self.fs,
330 matched_buffer_count: self.matched_buffer_count,
331 matches_count: self.matches_count,
332 confirm_contents_will_match_tx: &self.confirm_contents_will_match_tx,
333 get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx,
334 publish_matches: &self.publish_matches,
335 };
336 // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
337 // steps straight away. Another worker might be about to produce a value that will
338 // be pushed there, thus we'll replace current worker's pipe with a dummy one.
339 // That way, we'll only ever close a next-stage channel when ALL workers do so.
340 select_biased! {
341 find_all_matches = find_all_matches.next() => {
342 if self.publish_matches.is_closed() {
343 break;
344 }
345 let Some(matches) = find_all_matches else {
346 self.publish_matches = bounded(1).0;
347 continue;
348 };
349 let result = handler.handle_find_all_matches(matches).await;
350 if let Some(_should_bail) = result {
351
352 self.publish_matches = bounded(1).0;
353 continue;
354 }
355 },
356 find_first_match = find_first_match.next() => {
357 if let Some(buffer_with_at_least_one_match) = find_first_match {
358 handler.handle_find_first_match(buffer_with_at_least_one_match).await;
359 } else {
360 self.get_buffer_for_full_scan_tx = bounded(1).0;
361 }
362
363 },
364 scan_path = scan_path.next() => {
365 if let Some(path_to_scan) = scan_path {
366 handler.handle_scan_path(path_to_scan).await;
367 } else {
368 // If we're the last worker to notice that this is not producing values, close the upstream.
369 self.confirm_contents_will_match_tx = bounded(1).0;
370 }
371
372 }
373 complete => {
374 break
375 },
376
377 }
378 }
379 }
380}
381
382struct RequestHandler<'worker> {
383 query: &'worker SearchQuery,
384 fs: &'worker dyn Fs,
385 open_entries: &'worker HashSet<ProjectEntryId>,
386 matched_buffer_count: &'worker AtomicUsize,
387 matches_count: &'worker AtomicUsize,
388
389 confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
390 get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
391 publish_matches: &'worker Sender<SearchResult>,
392}
393
394struct LimitReached;
395
396impl RequestHandler<'_> {
397 async fn handle_find_all_matches(
398 &self,
399 (buffer, snapshot): (Entity<Buffer>, BufferSnapshot),
400 ) -> Option<LimitReached> {
401 let ranges = self
402 .query
403 .search(&snapshot, None)
404 .await
405 .iter()
406 .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
407 .collect::<Vec<_>>();
408
409 let matched_ranges = ranges.len();
410 if self.matched_buffer_count.fetch_add(1, Ordering::Release)
411 > Search::MAX_SEARCH_RESULT_FILES
412 || self
413 .matches_count
414 .fetch_add(matched_ranges, Ordering::Release)
415 > Search::MAX_SEARCH_RESULT_RANGES
416 {
417 _ = self.publish_matches.send(SearchResult::LimitReached).await;
418 Some(LimitReached)
419 } else {
420 _ = self
421 .publish_matches
422 .send(SearchResult::Buffer { buffer, ranges })
423 .await;
424 None
425 }
426 }
427 async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
428 _=maybe!(async move {
429 let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
430 let Some(file) = self.fs.open_sync(&abs_path).await.log_err() else {
431 return anyhow::Ok(());
432 };
433
434 let mut file = BufReader::new(file);
435 let file_start = file.fill_buf()?;
436
437 if let Err(Some(starting_position)) =
438 std::str::from_utf8(file_start).map_err(|e| e.error_len())
439 {
440 // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
441 // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
442 log::debug!(
443 "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
444 );
445 return Ok(());
446 }
447
448 if self.query.detect(file).unwrap_or(false) {
449 // Yes, we should scan the whole file.
450 entry.should_scan_tx.send(entry.path).await?;
451 }
452 Ok(())
453 }).await;
454 }
455
456 async fn handle_scan_path(&self, req: InputPath) {
457 _ = maybe!(async move {
458 let InputPath {
459 entry,
460
461 snapshot,
462 should_scan_tx,
463 } = req;
464
465 if entry.is_fifo || !entry.is_file() {
466 return Ok(());
467 }
468
469 if self.query.filters_path() {
470 let matched_path = if self.query.match_full_paths() {
471 let mut full_path = snapshot.root_name().as_std_path().to_owned();
472 full_path.push(entry.path.as_std_path());
473 self.query.match_path(&full_path)
474 } else {
475 self.query.match_path(entry.path.as_std_path())
476 };
477 if !matched_path {
478 return Ok(());
479 }
480 }
481
482 if self.open_entries.contains(&entry.id) {
483 // The buffer is already in memory and that's the version we want to scan;
484 // hence skip the dilly-dally and look for all matches straight away.
485 self.get_buffer_for_full_scan_tx
486 .send(ProjectPath {
487 worktree_id: snapshot.id(),
488 path: entry.path.clone(),
489 })
490 .await?;
491 } else {
492 self.confirm_contents_will_match_tx
493 .send(MatchingEntry {
494 should_scan_tx: should_scan_tx,
495 worktree_root: snapshot.abs_path().clone(),
496 path: ProjectPath {
497 worktree_id: snapshot.id(),
498 path: entry.path.clone(),
499 },
500 })
501 .await?;
502 }
503
504 anyhow::Ok(())
505 })
506 .await;
507 }
508}
509
510struct InputPath {
511 entry: Entry,
512 snapshot: Snapshot,
513 should_scan_tx: oneshot::Sender<ProjectPath>,
514}
515
516struct MatchingEntry {
517 worktree_root: Arc<Path>,
518 path: ProjectPath,
519 should_scan_tx: oneshot::Sender<ProjectPath>,
520}