1use std::{
2 io::{BufRead, BufReader},
3 path::Path,
4 pin::pin,
5 sync::{
6 Arc,
7 atomic::{AtomicUsize, Ordering},
8 },
9};
10
11use collections::HashSet;
12use fs::Fs;
13use futures::{SinkExt, StreamExt, select_biased};
14use gpui::{App, AsyncApp, Entity, Task};
15use language::{Buffer, BufferSnapshot};
16use postage::oneshot;
17use smol::channel::{Receiver, Sender, bounded, unbounded};
18
19use util::{ResultExt, maybe};
20use worktree::{Entry, ProjectEntryId, Snapshot, WorktreeSettings};
21
22use crate::{
23 ProjectItem, ProjectPath,
24 buffer_store::BufferStore,
25 search::{SearchQuery, SearchResult},
26};
27
28pub(crate) struct Search {
29 pub(crate) fs: Arc<dyn Fs>,
30 pub(crate) buffer_store: Entity<BufferStore>,
31 pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>,
32}
33
34const MAX_SEARCH_RESULT_FILES: usize = 5_000;
35const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
36
37/// Represents results of project search and allows one to either obtain match positions OR
38/// just the handles to buffers that may match the search.
39#[must_use]
40pub(crate) struct SearchResultsHandle {
41 results: Receiver<SearchResult>,
42 matching_buffers: Receiver<Entity<Buffer>>,
43 trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
44}
45
46impl SearchResultsHandle {
47 pub(crate) fn results(self, cx: &mut App) -> Receiver<SearchResult> {
48 (self.trigger_search)(cx).detach();
49 self.results
50 }
51 pub(crate) fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
52 (self.trigger_search)(cx).detach();
53 self.matching_buffers
54 }
55}
56
57impl Search {
58 /// Prepares a project search run. The result has to be used to specify whether you're interested in matching buffers
59 /// or full search results.
60 pub(crate) fn into_results(self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
61 let mut open_buffers = HashSet::default();
62 let mut unnamed_buffers = Vec::new();
63 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
64 let buffers = self.buffer_store.read(cx);
65 for handle in buffers.buffers() {
66 let buffer = handle.read(cx);
67 if !buffers.is_searchable(&buffer.remote_id()) {
68 continue;
69 } else if let Some(entry_id) = buffer.entry_id(cx) {
70 open_buffers.insert(entry_id);
71 } else {
72 // limit = limit.saturating_sub(1); todo!()
73 unnamed_buffers.push(handle)
74 };
75 }
76 let executor = cx.background_executor().clone();
77 let (tx, rx) = unbounded();
78 let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
79 let matching_buffers = grab_buffer_snapshot_rx.clone();
80 let trigger_search = Box::new(|cx: &mut App| {
81 cx.spawn(async move |cx| {
82 for buffer in unnamed_buffers {
83 _ = grab_buffer_snapshot_tx.send(buffer).await;
84 }
85
86 let (find_all_matches_tx, find_all_matches_rx) =
87 bounded(MAX_CONCURRENT_BUFFER_OPENS);
88
89 let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) = unbounded();
90 let matches_count = AtomicUsize::new(0);
91 let matched_buffer_count = AtomicUsize::new(0);
92 let worker_pool = executor.scoped(|scope| {
93 let (input_paths_tx, input_paths_rx) = bounded(64);
94 let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
95 bounded(64);
96 let (sorted_search_results_tx, sorted_search_results_rx) = bounded(64);
97 for _ in 0..executor.num_cpus() {
98 let worker = Worker {
99 query: &query,
100 open_buffers: &open_buffers,
101 matched_buffer_count: &matched_buffer_count,
102 matches_count: &matches_count,
103 fs: &*self.fs,
104 input_paths_rx: input_paths_rx.clone(),
105 confirm_contents_will_match_rx: confirm_contents_will_match_rx.clone(),
106 confirm_contents_will_match_tx: confirm_contents_will_match_tx.clone(),
107 get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(),
108 find_all_matches_rx: find_all_matches_rx.clone(),
109 publish_matches: tx.clone(),
110 };
111 scope.spawn(worker.run());
112 }
113 scope.spawn(self.provide_search_paths(
114 &query,
115 input_paths_tx,
116 sorted_search_results_tx,
117 ));
118 scope.spawn(self.maintain_sorted_search_results(
119 sorted_search_results_rx,
120 get_buffer_for_full_scan_tx,
121 ))
122 });
123 let open_buffers = self.open_buffers(
124 get_buffer_for_full_scan_rx,
125 grab_buffer_snapshot_tx,
126 cx.clone(),
127 );
128 let buffer_snapshots = self.grab_buffer_snapshots(
129 grab_buffer_snapshot_rx,
130 find_all_matches_tx,
131 cx.clone(),
132 );
133 futures::future::join3(worker_pool, buffer_snapshots, open_buffers).await;
134
135 let limit_reached = matches_count.load(Ordering::Acquire)
136 > MAX_SEARCH_RESULT_RANGES
137 || matched_buffer_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_FILES;
138 if limit_reached {
139 _ = tx.send(SearchResult::LimitReached).await;
140 }
141 })
142 });
143 SearchResultsHandle {
144 results: rx,
145 matching_buffers,
146 trigger_search,
147 }
148 }
149
150 async fn provide_search_paths<'this>(
151 &'this self,
152 query: &SearchQuery,
153 tx: Sender<InputPath<'this>>,
154 results: Sender<oneshot::Receiver<ProjectPath>>,
155 ) {
156 for (snapshot, worktree_settings) in &self.snapshots {
157 for entry in snapshot.entries(query.include_ignored(), 0) {
158 let (should_scan_tx, should_scan_rx) = oneshot::channel();
159 let Ok(_) = tx
160 .send(InputPath {
161 entry,
162 settings: worktree_settings,
163 snapshot: snapshot,
164 should_scan_tx,
165 })
166 .await
167 else {
168 return;
169 };
170 if results.send(should_scan_rx).await.is_err() {
171 return;
172 };
173 }
174 }
175 }
176
177 async fn maintain_sorted_search_results(
178 &self,
179 rx: Receiver<oneshot::Receiver<ProjectPath>>,
180 paths_for_full_scan: Sender<ProjectPath>,
181 ) {
182 let mut rx = pin!(rx);
183 while let Some(mut next_path_result) = rx.next().await {
184 let Some(successful_path) = next_path_result.next().await else {
185 // This math did not produce a match, hence skip it.
186 continue;
187 };
188 if paths_for_full_scan.send(successful_path).await.is_err() {
189 return;
190 };
191 }
192 }
193
194 /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
195 async fn open_buffers(
196 &self,
197 rx: Receiver<ProjectPath>,
198 find_all_matches_tx: Sender<Entity<Buffer>>,
199 mut cx: AsyncApp,
200 ) {
201 _ = maybe!(async move {
202 while let Ok(requested_path) = rx.recv().await {
203 let Some(buffer) = self
204 .buffer_store
205 .update(&mut cx, |this, cx| this.open_buffer(requested_path, cx))?
206 .await
207 .log_err()
208 else {
209 continue;
210 };
211 find_all_matches_tx.send(buffer).await?;
212 }
213 Result::<_, anyhow::Error>::Ok(())
214 })
215 .await;
216 }
217
218 async fn grab_buffer_snapshots<'a>(
219 &'a self,
220 rx: Receiver<Entity<Buffer>>,
221 find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
222 mut cx: AsyncApp,
223 ) {
224 _ = maybe!(async move {
225 while let Ok(buffer) = rx.recv().await {
226 let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
227 find_all_matches_tx.send((buffer, snapshot)).await?;
228 }
229 Result::<_, anyhow::Error>::Ok(())
230 })
231 .await;
232 }
233}
234
235struct Worker<'search> {
236 query: &'search SearchQuery,
237 matched_buffer_count: &'search AtomicUsize,
238 matches_count: &'search AtomicUsize,
239 open_buffers: &'search HashSet<ProjectEntryId>,
240 fs: &'search dyn Fs,
241 /// Start off with all paths in project and filter them based on:
242 /// - Include filters
243 /// - Exclude filters
244 /// - Only open buffers
245 /// - Scan ignored files
246 /// Put another way: filter out files that can't match (without looking at file contents)
247 input_paths_rx: Receiver<InputPath<'search>>,
248
249 /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
250 /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
251 confirm_contents_will_match_tx: Sender<MatchingEntry>,
252 confirm_contents_will_match_rx: Receiver<MatchingEntry>,
253 /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
254 /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
255 get_buffer_for_full_scan_tx: Sender<ProjectPath>,
256 /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
257 find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
258 /// Cool, we have results; let's share them with the world.
259 publish_matches: Sender<SearchResult>,
260}
261
262impl Worker<'_> {
263 async fn run(mut self) {
264 let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
265 let mut find_first_match = pin!(self.confirm_contents_will_match_rx.fuse());
266 let mut scan_path = pin!(self.input_paths_rx.fuse());
267
268 loop {
269 let handler = RequestHandler {
270 query: self.query,
271 open_entries: &self.open_buffers,
272 fs: self.fs,
273 matched_buffer_count: self.matched_buffer_count,
274 matches_count: self.matches_count,
275 confirm_contents_will_match_tx: &self.confirm_contents_will_match_tx,
276 get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx,
277 publish_matches: &self.publish_matches,
278 };
279 // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent
280 // steps straight away. Another worker might be about to produce a value that will
281 // be pushed there, thus we'll replace current worker's pipe with a dummy one.
282 // That way, we'll only ever close a next-stage channel when ALL workers do so.
283 select_biased! {
284 find_all_matches = find_all_matches.next() => {
285 let Some(matches) = find_all_matches else {
286 self.publish_matches = bounded(1).0;
287 continue;
288 };
289 let result = handler.handle_find_all_matches(matches).await;
290 if let Some(_should_bail) = result {
291 self.publish_matches = bounded(1).0;
292 break;
293 }
294 },
295 find_first_match = find_first_match.next() => {
296 if let Some(buffer_with_at_least_one_match) = find_first_match {
297 handler.handle_find_first_match(buffer_with_at_least_one_match).await;
298 } else {
299 self.get_buffer_for_full_scan_tx = bounded(1).0;
300 }
301
302 },
303 scan_path = scan_path.next() => {
304 if let Some(path_to_scan) = scan_path {
305 handler.handle_scan_path(path_to_scan).await;
306 } else {
307 // If we're the last worker to notice that this is not producing values, close the upstream.
308 self.confirm_contents_will_match_tx = bounded(1).0;
309 }
310
311 }
312 complete => {
313 break
314 },
315
316 }
317 }
318 }
319}
320
321struct RequestHandler<'worker> {
322 query: &'worker SearchQuery,
323 fs: &'worker dyn Fs,
324 open_entries: &'worker HashSet<ProjectEntryId>,
325 matched_buffer_count: &'worker AtomicUsize,
326 matches_count: &'worker AtomicUsize,
327
328 confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
329 get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
330 publish_matches: &'worker Sender<SearchResult>,
331}
332
333struct LimitReached;
334
335impl RequestHandler<'_> {
336 async fn handle_find_all_matches(
337 &self,
338 (buffer, snapshot): (Entity<Buffer>, BufferSnapshot),
339 ) -> Option<LimitReached> {
340 let ranges = self
341 .query
342 .search(&snapshot, None)
343 .await
344 .iter()
345 .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
346 .collect::<Vec<_>>();
347
348 let matched_ranges = ranges.len();
349 if self.matched_buffer_count.fetch_add(1, Ordering::Release) > MAX_SEARCH_RESULT_FILES
350 || self
351 .matches_count
352 .fetch_add(matched_ranges, Ordering::Release)
353 > MAX_SEARCH_RESULT_RANGES
354 {
355 Some(LimitReached)
356 } else {
357 _ = self
358 .publish_matches
359 .send(SearchResult::Buffer { buffer, ranges })
360 .await;
361 None
362 }
363 }
364 async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
365 _=maybe!(async move {
366 let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
367 let Some(file) = self.fs.open_sync(&abs_path).await.log_err() else {
368 return anyhow::Ok(());
369 };
370
371 let mut file = BufReader::new(file);
372 let file_start = file.fill_buf()?;
373
374 if let Err(Some(starting_position)) =
375 std::str::from_utf8(file_start).map_err(|e| e.error_len())
376 {
377 // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
378 // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
379 log::debug!(
380 "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
381 );
382 return Ok(());
383 }
384
385 if self.query.detect(file).unwrap_or(false) {
386 // Yes, we should scan the whole file.
387 entry.should_scan_tx.send(entry.path).await?;
388 }
389 Ok(())
390 }).await;
391 }
392
393 async fn handle_scan_path(&self, req: InputPath<'_>) {
394 _ = maybe!(async move {
395 let InputPath {
396 entry,
397 settings,
398 snapshot,
399 should_scan_tx,
400 } = req;
401 if entry.is_dir() && entry.is_ignored {
402 if !settings.is_path_excluded(&entry.path) {
403 // Self::scan_ignored_dir(
404 // self.fs,
405 // &snapshot,
406 // &entry.path,
407 // self.query,
408 // &filter_tx,
409 // &output_tx,
410 // )
411 // .await?;
412 }
413 return Ok(());
414 }
415
416 if entry.is_fifo || !entry.is_file() {
417 return Ok(());
418 }
419
420 if self.query.filters_path() {
421 let matched_path = if self.query.match_full_paths() {
422 let mut full_path = snapshot.root_name().as_std_path().to_owned();
423 full_path.push(entry.path.as_std_path());
424 self.query.match_path(&full_path)
425 } else {
426 self.query.match_path(entry.path.as_std_path())
427 };
428 if !matched_path {
429 return Ok(());
430 }
431 }
432
433 if self.open_entries.contains(&entry.id) {
434 // The buffer is already in memory and that's the version we want to scan;
435 // hence skip the dilly-dally and look for all matches straight away.
436 self.get_buffer_for_full_scan_tx
437 .send(ProjectPath {
438 worktree_id: snapshot.id(),
439 path: entry.path.clone(),
440 })
441 .await?;
442 } else {
443 self.confirm_contents_will_match_tx
444 .send(MatchingEntry {
445 should_scan_tx: should_scan_tx,
446 worktree_root: snapshot.abs_path().clone(),
447 path: ProjectPath {
448 worktree_id: snapshot.id(),
449 path: entry.path.clone(),
450 },
451 })
452 .await?;
453 }
454
455 anyhow::Ok(())
456 })
457 .await;
458 }
459}
460
461struct InputPath<'worker> {
462 entry: &'worker Entry,
463 settings: &'worker WorktreeSettings,
464 snapshot: &'worker Snapshot,
465 should_scan_tx: oneshot::Sender<ProjectPath>,
466}
467
468struct MatchingEntry {
469 worktree_root: Arc<Path>,
470 path: ProjectPath,
471 should_scan_tx: oneshot::Sender<ProjectPath>,
472}