1use std::{
2 io::{BufRead, BufReader},
3 path::Path,
4 pin::pin,
5 sync::{
6 Arc,
7 atomic::{AtomicUsize, Ordering},
8 },
9};
10
11use anyhow::Context;
12use collections::HashSet;
13use fs::Fs;
14use futures::{SinkExt, StreamExt, select_biased};
15use gpui::{App, AsyncApp, Entity, Task};
16use language::{Buffer, BufferSnapshot};
17use postage::oneshot;
18use smol::channel::{Receiver, Sender, bounded, unbounded};
19
20use util::{ResultExt, maybe};
21use worktree::{Entry, ProjectEntryId, Snapshot, Worktree};
22
23use crate::{
24 ProjectItem, ProjectPath,
25 buffer_store::BufferStore,
26 search::{SearchQuery, SearchResult},
27};
28
29pub(crate) struct Search {
30 pub(crate) fs: Arc<dyn Fs>,
31 pub(crate) buffer_store: Entity<BufferStore>,
32 pub(crate) worktrees: Vec<Entity<Worktree>>,
33}
34
35const MAX_SEARCH_RESULT_FILES: usize = 5_000;
36const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
37
38/// Represents results of project search and allows one to either obtain match positions OR
39/// just the handles to buffers that may match the search.
40#[must_use]
41pub(crate) struct SearchResultsHandle {
42 results: Receiver<SearchResult>,
43 matching_buffers: Receiver<Entity<Buffer>>,
44 trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
45}
46
47impl SearchResultsHandle {
48 pub(crate) fn results(self, cx: &mut App) -> Receiver<SearchResult> {
49 (self.trigger_search)(cx).detach();
50 self.results
51 }
52 pub(crate) fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
53 (self.trigger_search)(cx).detach();
54 self.matching_buffers
55 }
56}
57
58impl Search {
59 /// Prepares a project search run. The result has to be used to specify whether you're interested in matching buffers
60 /// or full search results.
61 pub(crate) fn into_results(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
62 let mut open_buffers = HashSet::default();
63 let mut unnamed_buffers = Vec::new();
64 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
65 let buffers = self.buffer_store.read(cx);
66 for handle in buffers.buffers() {
67 let buffer = handle.read(cx);
68 if !buffers.is_searchable(&buffer.remote_id()) {
69 continue;
70 } else if let Some(entry_id) = buffer.entry_id(cx) {
71 open_buffers.insert(entry_id);
72 } else {
73 // limit = limit.saturating_sub(1); todo!()
74 unnamed_buffers.push(handle)
75 };
76 }
77 let executor = cx.background_executor().clone();
78 let (tx, rx) = unbounded();
79 let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
80 let matching_buffers = grab_buffer_snapshot_rx.clone();
81 let trigger_search = Box::new(|cx: &mut App| {
82 cx.spawn(async move |cx| {
83 for buffer in unnamed_buffers {
84 _ = grab_buffer_snapshot_tx.send(buffer).await;
85 }
86
87 let (find_all_matches_tx, find_all_matches_rx) =
88 bounded(MAX_CONCURRENT_BUFFER_OPENS);
89
90 let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) = unbounded();
91 let matches_count = AtomicUsize::new(0);
92 let matched_buffer_count = AtomicUsize::new(0);
93 let (input_paths_tx, input_paths_rx) = unbounded();
94 let (sorted_search_results_tx, sorted_search_results_rx) = unbounded();
95 let worker_pool = executor.scoped(|scope| {
96 let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
97 bounded(64);
98
99 let num_cpus = executor.num_cpus();
100
101 assert!(num_cpus > 0);
102 for _ in 0..executor.num_cpus() - 1 {
103 let worker = Worker {
104 query: &query,
105 open_buffers: &open_buffers,
106 matched_buffer_count: &matched_buffer_count,
107 matches_count: &matches_count,
108 fs: &*self.fs,
109 input_paths_rx: input_paths_rx.clone(),
110 confirm_contents_will_match_rx: confirm_contents_will_match_rx.clone(),
111 confirm_contents_will_match_tx: confirm_contents_will_match_tx.clone(),
112 get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(),
113 find_all_matches_rx: find_all_matches_rx.clone(),
114 publish_matches: tx.clone(),
115 };
116 scope.spawn(worker.run());
117 }
118 drop(tx);
119
120 scope.spawn(Self::maintain_sorted_search_results(
121 sorted_search_results_rx,
122 get_buffer_for_full_scan_tx,
123 ))
124 });
125 let provide_search_paths = cx.spawn(Self::provide_search_paths(
126 std::mem::take(&mut self.worktrees),
127 query.include_ignored(),
128 input_paths_tx,
129 sorted_search_results_tx,
130 ));
131 let open_buffers = self.open_buffers(
132 get_buffer_for_full_scan_rx,
133 grab_buffer_snapshot_tx,
134 cx.clone(),
135 );
136 let buffer_snapshots = self.grab_buffer_snapshots(
137 grab_buffer_snapshot_rx,
138 find_all_matches_tx,
139 cx.clone(),
140 );
141 futures::future::join4(
142 worker_pool,
143 buffer_snapshots,
144 open_buffers,
145 provide_search_paths,
146 )
147 .await;
148 })
149 });
150 SearchResultsHandle {
151 results: rx,
152 matching_buffers,
153 trigger_search,
154 }
155 }
156
157 fn provide_search_paths(
158 worktrees: Vec<Entity<Worktree>>,
159 include_ignored: bool,
160 tx: Sender<InputPath>,
161 results: Sender<oneshot::Receiver<ProjectPath>>,
162 ) -> impl AsyncFnOnce(&mut AsyncApp) {
163 async move |cx| {
164 _ = maybe!(async move {
165 for worktree in worktrees {
166 let (mut snapshot, worktree_settings) = worktree
167 .read_with(cx, |this, _| {
168 Some((this.snapshot(), this.as_local()?.settings()))
169 })?
170 .context("The worktree is not local")?;
171 if include_ignored {
172 // Pre-fetch all of the ignored directories as they're going to be searched.
173 let mut entries_to_refresh = vec![];
174 for entry in snapshot.entries(include_ignored, 0) {
175 if entry.is_ignored && entry.kind.is_unloaded() {
176 if !worktree_settings.is_path_excluded(&entry.path) {
177 entries_to_refresh.push(entry.path.clone());
178 }
179 }
180 }
181 let barrier = worktree.update(cx, |this, _| {
182 let local = this.as_local_mut()?;
183 let barrier = entries_to_refresh
184 .into_iter()
185 .map(|path| local.add_path_prefix_to_scan(path).into_future())
186 .collect::<Vec<_>>();
187 Some(barrier)
188 })?;
189 if let Some(barriers) = barrier {
190 futures::future::join_all(barriers).await;
191 }
192 snapshot = worktree.read_with(cx, |this, _| this.snapshot())?;
193 }
194 cx.background_executor()
195 .scoped(|scope| {
196 scope.spawn(async {
197 for entry in snapshot.files(include_ignored, 0) {
198 let (should_scan_tx, should_scan_rx) = oneshot::channel();
199 let Ok(_) = tx
200 .send(InputPath {
201 entry: entry.clone(),
202 snapshot: snapshot.clone(),
203 should_scan_tx,
204 })
205 .await
206 else {
207 return;
208 };
209 if results.send(should_scan_rx).await.is_err() {
210 return;
211 };
212 }
213 })
214 })
215 .await;
216 }
217 anyhow::Ok(())
218 })
219 .await;
220 }
221 }
222
223 async fn maintain_sorted_search_results(
224 rx: Receiver<oneshot::Receiver<ProjectPath>>,
225 paths_for_full_scan: Sender<ProjectPath>,
226 ) {
227 let mut rx = pin!(rx);
228 while let Some(mut next_path_result) = rx.next().await {
229 let Some(successful_path) = next_path_result.next().await else {
230 // This math did not produce a match, hence skip it.
231 continue;
232 };
233 if paths_for_full_scan.send(successful_path).await.is_err() {
234 return;
235 };
236 }
237 }
238
239 /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
240 async fn open_buffers(
241 &self,
242 rx: Receiver<ProjectPath>,
243 find_all_matches_tx: Sender<Entity<Buffer>>,
244 mut cx: AsyncApp,
245 ) {
246 _ = maybe!(async move {
247 while let Ok(requested_path) = rx.recv().await {
248 let Some(buffer) = self
249 .buffer_store
250 .update(&mut cx, |this, cx| this.open_buffer(requested_path, cx))?
251 .await
252 .log_err()
253 else {
254 continue;
255 };
256 find_all_matches_tx.send(buffer).await?;
257 }
258 Result::<_, anyhow::Error>::Ok(())
259 })
260 .await;
261 }
262
263 async fn grab_buffer_snapshots<'a>(
264 &'a self,
265 rx: Receiver<Entity<Buffer>>,
266 find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
267 mut cx: AsyncApp,
268 ) {
269 _ = maybe!(async move {
270 while let Ok(buffer) = rx.recv().await {
271 let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
272 find_all_matches_tx.send((buffer, snapshot)).await?;
273 }
274 Result::<_, anyhow::Error>::Ok(())
275 })
276 .await;
277 }
278}
279
280struct Worker<'search> {
281 query: &'search SearchQuery,
282 matched_buffer_count: &'search AtomicUsize,
283 matches_count: &'search AtomicUsize,
284 open_buffers: &'search HashSet<ProjectEntryId>,
285 fs: &'search dyn Fs,
286 /// Start off with all paths in project and filter them based on:
287 /// - Include filters
288 /// - Exclude filters
289 /// - Only open buffers
290 /// - Scan ignored files
291 /// Put another way: filter out files that can't match (without looking at file contents)
292 input_paths_rx: Receiver<InputPath>,
293
294 /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
295 /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
296 confirm_contents_will_match_tx: Sender<MatchingEntry>,
297 confirm_contents_will_match_rx: Receiver<MatchingEntry>,
298 /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
299 /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
300 get_buffer_for_full_scan_tx: Sender<ProjectPath>,
301 /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
302 find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
303 /// Cool, we have results; let's share them with the world.
304 publish_matches: Sender<SearchResult>,
305}
306
307impl Worker<'_> {
308 async fn run(mut self) {
309 let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
310 let mut find_first_match = pin!(self.confirm_contents_will_match_rx.fuse());
311 let mut scan_path = pin!(self.input_paths_rx.fuse());
312
313 loop {
314 let handler = RequestHandler {
315 query: self.query,
316 open_entries: &self.open_buffers,
317 fs: self.fs,
318 matched_buffer_count: self.matched_buffer_count,
319 matches_count: self.matches_count,
320 confirm_contents_will_match_tx: &self.confirm_contents_will_match_tx,
321 get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx,
322 publish_matches: &self.publish_matches,
323 };
324 // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
325 // steps straight away. Another worker might be about to produce a value that will
326 // be pushed there, thus we'll replace current worker's pipe with a dummy one.
327 // That way, we'll only ever close a next-stage channel when ALL workers do so.
328 select_biased! {
329 find_all_matches = find_all_matches.next() => {
330 if self.publish_matches.is_closed() {
331 break;
332 }
333 let Some(matches) = find_all_matches else {
334 self.publish_matches = bounded(1).0;
335 continue;
336 };
337 let result = handler.handle_find_all_matches(matches).await;
338 if let Some(_should_bail) = result {
339
340 self.publish_matches = bounded(1).0;
341 continue;
342 }
343 },
344 find_first_match = find_first_match.next() => {
345 if let Some(buffer_with_at_least_one_match) = find_first_match {
346 handler.handle_find_first_match(buffer_with_at_least_one_match).await;
347 } else {
348 self.get_buffer_for_full_scan_tx = bounded(1).0;
349 }
350
351 },
352 scan_path = scan_path.next() => {
353 if let Some(path_to_scan) = scan_path {
354 handler.handle_scan_path(path_to_scan).await;
355 } else {
356 // If we're the last worker to notice that this is not producing values, close the upstream.
357 self.confirm_contents_will_match_tx = bounded(1).0;
358 }
359
360 }
361 complete => {
362 break
363 },
364
365 }
366 }
367 }
368}
369
370struct RequestHandler<'worker> {
371 query: &'worker SearchQuery,
372 fs: &'worker dyn Fs,
373 open_entries: &'worker HashSet<ProjectEntryId>,
374 matched_buffer_count: &'worker AtomicUsize,
375 matches_count: &'worker AtomicUsize,
376
377 confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
378 get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
379 publish_matches: &'worker Sender<SearchResult>,
380}
381
382struct LimitReached;
383
384impl RequestHandler<'_> {
385 async fn handle_find_all_matches(
386 &self,
387 (buffer, snapshot): (Entity<Buffer>, BufferSnapshot),
388 ) -> Option<LimitReached> {
389 let ranges = self
390 .query
391 .search(&snapshot, None)
392 .await
393 .iter()
394 .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
395 .collect::<Vec<_>>();
396
397 let matched_ranges = ranges.len();
398 if self.matched_buffer_count.fetch_add(1, Ordering::Release) > MAX_SEARCH_RESULT_FILES
399 || self
400 .matches_count
401 .fetch_add(matched_ranges, Ordering::Release)
402 > MAX_SEARCH_RESULT_RANGES
403 {
404 Some(LimitReached)
405 } else {
406 _ = self
407 .publish_matches
408 .send(SearchResult::Buffer { buffer, ranges })
409 .await;
410 None
411 }
412 }
413 async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
414 _=maybe!(async move {
415 let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
416 let Some(file) = self.fs.open_sync(&abs_path).await.log_err() else {
417 return anyhow::Ok(());
418 };
419
420 let mut file = BufReader::new(file);
421 let file_start = file.fill_buf()?;
422
423 if let Err(Some(starting_position)) =
424 std::str::from_utf8(file_start).map_err(|e| e.error_len())
425 {
426 // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
427 // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
428 log::debug!(
429 "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
430 );
431 return Ok(());
432 }
433
434 if self.query.detect(file).unwrap_or(false) {
435 // Yes, we should scan the whole file.
436 entry.should_scan_tx.send(entry.path).await?;
437 }
438 Ok(())
439 }).await;
440 }
441
442 async fn handle_scan_path(&self, req: InputPath) {
443 _ = maybe!(async move {
444 let InputPath {
445 entry,
446
447 snapshot,
448 should_scan_tx,
449 } = req;
450
451 if entry.is_fifo || !entry.is_file() {
452 return Ok(());
453 }
454
455 if self.query.filters_path() {
456 let matched_path = if self.query.match_full_paths() {
457 let mut full_path = snapshot.root_name().as_std_path().to_owned();
458 full_path.push(entry.path.as_std_path());
459 self.query.match_path(&full_path)
460 } else {
461 self.query.match_path(entry.path.as_std_path())
462 };
463 if !matched_path {
464 return Ok(());
465 }
466 }
467
468 if self.open_entries.contains(&entry.id) {
469 // The buffer is already in memory and that's the version we want to scan;
470 // hence skip the dilly-dally and look for all matches straight away.
471 self.get_buffer_for_full_scan_tx
472 .send(ProjectPath {
473 worktree_id: snapshot.id(),
474 path: entry.path.clone(),
475 })
476 .await?;
477 } else {
478 self.confirm_contents_will_match_tx
479 .send(MatchingEntry {
480 should_scan_tx: should_scan_tx,
481 worktree_root: snapshot.abs_path().clone(),
482 path: ProjectPath {
483 worktree_id: snapshot.id(),
484 path: entry.path.clone(),
485 },
486 })
487 .await?;
488 }
489
490 anyhow::Ok(())
491 })
492 .await;
493 }
494}
495
496struct InputPath {
497 entry: Entry,
498 snapshot: Snapshot,
499 should_scan_tx: oneshot::Sender<ProjectPath>,
500}
501
502struct MatchingEntry {
503 worktree_root: Arc<Path>,
504 path: ProjectPath,
505 should_scan_tx: oneshot::Sender<ProjectPath>,
506}