1use std::{
2 io::{BufRead, BufReader},
3 path::Path,
4 pin::pin,
5 sync::{
6 Arc,
7 atomic::{AtomicUsize, Ordering},
8 },
9};
10
11use collections::HashSet;
12use fs::Fs;
13use futures::{SinkExt, StreamExt, select_biased};
14use gpui::{App, AsyncApp, Entity, Task};
15use language::{Buffer, BufferSnapshot};
16use postage::oneshot;
17use smol::channel::{Receiver, Sender, bounded, unbounded};
18
19use util::{ResultExt, maybe};
20use worktree::{Entry, ProjectEntryId, Snapshot, WorktreeSettings};
21
22use crate::{
23 ProjectItem, ProjectPath,
24 buffer_store::BufferStore,
25 search::{SearchQuery, SearchResult},
26};
27
28pub(crate) struct Search {
29 pub(crate) fs: Arc<dyn Fs>,
30 pub(crate) buffer_store: Entity<BufferStore>,
31 pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>,
32 pub(crate) open_buffers: HashSet<ProjectEntryId>,
33}
34
35const MAX_SEARCH_RESULT_FILES: usize = 5_000;
36const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
37
38/// Represents results of project search and allows one to either obtain match positions OR
39/// just the handles to buffers that may match the search.
40#[must_use]
41pub(crate) struct SearchResultsHandle {
42 results: Receiver<SearchResult>,
43 matching_buffers: Receiver<Entity<Buffer>>,
44 trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
45}
46
47impl SearchResultsHandle {
48 pub(crate) fn results(self, cx: &mut App) -> Receiver<SearchResult> {
49 (self.trigger_search)(cx).detach();
50 self.results
51 }
52 pub(crate) fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
53 (self.trigger_search)(cx).detach();
54 self.matching_buffers
55 }
56}
57
58impl Search {
59 /// Prepares a project search run. The result has to be used to specify whether you're interested in matching buffers
60 /// or full search results.
61 pub(crate) fn into_results(self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
62 let mut open_buffers = HashSet::default();
63 let mut unnamed_buffers = Vec::new();
64 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
65 let buffers = self.buffer_store.read(cx);
66 for handle in buffers.buffers() {
67 let buffer = handle.read(cx);
68 if !buffers.is_searchable(&buffer.remote_id()) {
69 continue;
70 } else if let Some(entry_id) = buffer.entry_id(cx) {
71 open_buffers.insert(entry_id);
72 } else {
73 // limit = limit.saturating_sub(1); todo!()
74 unnamed_buffers.push(handle)
75 };
76 }
77 let executor = cx.background_executor().clone();
78 let (tx, rx) = unbounded();
79 let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) =
80 bounded(MAX_CONCURRENT_BUFFER_OPENS);
81 let matching_buffers = grab_buffer_snapshot_rx.clone();
82 let trigger_search = Box::new(|cx: &mut App| {
83 cx.spawn(async move |cx| {
84 let (find_all_matches_tx, find_all_matches_rx) =
85 bounded(MAX_CONCURRENT_BUFFER_OPENS);
86
87 let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) = unbounded();
88 let matches_count = AtomicUsize::new(0);
89 let matched_buffer_count = AtomicUsize::new(0);
90 let worker_pool = executor.scoped(|scope| {
91 let (input_paths_tx, input_paths_rx) = bounded(64);
92 let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
93 bounded(64);
94 let (sorted_search_results_tx, sorted_search_results_rx) = bounded(64);
95 for _ in 0..executor.num_cpus() {
96 let worker = Worker {
97 query: &query,
98 open_buffers: &self.open_buffers,
99 matched_buffer_count: &matched_buffer_count,
100 matches_count: &matches_count,
101 fs: &*self.fs,
102 input_paths_rx: input_paths_rx.clone(),
103 confirm_contents_will_match_rx: confirm_contents_will_match_rx.clone(),
104 confirm_contents_will_match_tx: confirm_contents_will_match_tx.clone(),
105 get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(),
106 find_all_matches_rx: find_all_matches_rx.clone(),
107 publish_matches: tx.clone(),
108 };
109 scope.spawn(worker.run());
110 }
111 scope.spawn(self.provide_search_paths(
112 &query,
113 input_paths_tx,
114 sorted_search_results_tx,
115 ));
116 scope.spawn(self.maintain_sorted_search_results(
117 sorted_search_results_rx,
118 get_buffer_for_full_scan_tx,
119 ))
120 });
121 let open_buffers = self.open_buffers(
122 get_buffer_for_full_scan_rx,
123 grab_buffer_snapshot_tx,
124 cx.clone(),
125 );
126 let buffer_snapshots = self.grab_buffer_snapshots(
127 grab_buffer_snapshot_rx,
128 find_all_matches_tx,
129 cx.clone(),
130 );
131 futures::future::join3(worker_pool, buffer_snapshots, open_buffers).await;
132
133 let limit_reached = matches_count.load(Ordering::Acquire)
134 > MAX_SEARCH_RESULT_RANGES
135 || matched_buffer_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_FILES;
136 if limit_reached {
137 _ = tx.send(SearchResult::LimitReached).await;
138 }
139 })
140 });
141 SearchResultsHandle {
142 results: rx,
143 matching_buffers,
144 trigger_search,
145 }
146 }
147
148 async fn provide_search_paths<'this>(
149 &'this self,
150 query: &SearchQuery,
151 tx: Sender<InputPath<'this>>,
152 results: Sender<oneshot::Receiver<ProjectPath>>,
153 ) {
154 for (snapshot, worktree_settings) in &self.snapshots {
155 for entry in snapshot.entries(query.include_ignored(), 0) {
156 let (should_scan_tx, should_scan_rx) = oneshot::channel();
157 let Ok(_) = tx
158 .send(InputPath {
159 entry,
160 settings: worktree_settings,
161 snapshot: snapshot,
162 should_scan_tx,
163 })
164 .await
165 else {
166 return;
167 };
168 if results.send(should_scan_rx).await.is_err() {
169 return;
170 };
171 }
172 }
173 }
174
175 async fn maintain_sorted_search_results(
176 &self,
177 rx: Receiver<oneshot::Receiver<ProjectPath>>,
178 paths_for_full_scan: Sender<ProjectPath>,
179 ) {
180 let mut rx = pin!(rx);
181 while let Some(mut next_path_result) = rx.next().await {
182 let Some(successful_path) = next_path_result.next().await else {
183 // This math did not produce a match, hence skip it.
184 continue;
185 };
186 if paths_for_full_scan.send(successful_path).await.is_err() {
187 return;
188 };
189 }
190 }
191
192 /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
193 async fn open_buffers<'a>(
194 &'a self,
195 rx: Receiver<ProjectPath>,
196 find_all_matches_tx: Sender<Entity<Buffer>>,
197 mut cx: AsyncApp,
198 ) {
199 _ = maybe!(async move {
200 while let Ok(requested_path) = rx.recv().await {
201 let Some(buffer) = self
202 .buffer_store
203 .update(&mut cx, |this, cx| this.open_buffer(requested_path, cx))?
204 .await
205 .log_err()
206 else {
207 continue;
208 };
209 find_all_matches_tx.send(buffer).await?;
210 }
211 Result::<_, anyhow::Error>::Ok(())
212 })
213 .await;
214 }
215
216 async fn grab_buffer_snapshots<'a>(
217 &'a self,
218 rx: Receiver<Entity<Buffer>>,
219 find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
220 mut cx: AsyncApp,
221 ) {
222 _ = maybe!(async move {
223 while let Ok(buffer) = rx.recv().await {
224 let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
225 find_all_matches_tx.send((buffer, snapshot)).await?;
226 }
227 Result::<_, anyhow::Error>::Ok(())
228 })
229 .await;
230 }
231}
232
233struct Worker<'search> {
234 query: &'search SearchQuery,
235 matched_buffer_count: &'search AtomicUsize,
236 matches_count: &'search AtomicUsize,
237 open_buffers: &'search HashSet<ProjectEntryId>,
238 fs: &'search dyn Fs,
239 /// Start off with all paths in project and filter them based on:
240 /// - Include filters
241 /// - Exclude filters
242 /// - Only open buffers
243 /// - Scan ignored files
244 /// Put another way: filter out files that can't match (without looking at file contents)
245 input_paths_rx: Receiver<InputPath<'search>>,
246
247 /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
248 /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
249 confirm_contents_will_match_tx: Sender<MatchingEntry>,
250 confirm_contents_will_match_rx: Receiver<MatchingEntry>,
251 /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
252 /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
253 get_buffer_for_full_scan_tx: Sender<ProjectPath>,
254 /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
255 find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
256 /// Cool, we have results; let's share them with the world.
257 publish_matches: Sender<SearchResult>,
258}
259
260impl Worker<'_> {
261 async fn run(self) {
262 let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
263 let mut find_first_match = pin!(self.confirm_contents_will_match_rx.fuse());
264 let mut scan_path = pin!(self.input_paths_rx.fuse());
265 let handler = RequestHandler {
266 query: self.query,
267 open_entries: &self.open_buffers,
268 fs: self.fs,
269 matched_buffer_count: self.matched_buffer_count,
270 matches_count: self.matches_count,
271 confirm_contents_will_match_tx: &self.confirm_contents_will_match_tx,
272 get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx,
273 publish_matches: &self.publish_matches,
274 };
275
276 loop {
277 select_biased! {
278 find_all_matches = find_all_matches.next() => {
279 let Some(matches) = find_all_matches else {
280 self.publish_matches.close();
281 continue;
282 };
283 let result = handler.handle_find_all_matches(matches).await;
284 if let Some(_should_bail) = result {
285 self.publish_matches.close();
286 break;
287 }
288 },
289 find_first_match = find_first_match.next() => {
290 if let Some(buffer_with_at_least_one_match) = find_first_match {
291 handler.handle_find_first_match(buffer_with_at_least_one_match).await;
292 } else {
293 self.get_buffer_for_full_scan_tx.close();
294 }
295
296 },
297 scan_path = scan_path.next() => {
298 if let Some(path_to_scan) = scan_path {
299 handler.handle_scan_path(path_to_scan).await;
300 } else {
301 self.confirm_contents_will_match_tx.close();
302 }
303
304 }
305 complete => break,
306 }
307 }
308 }
309}
310
311struct RequestHandler<'worker> {
312 query: &'worker SearchQuery,
313 fs: &'worker dyn Fs,
314 open_entries: &'worker HashSet<ProjectEntryId>,
315 matched_buffer_count: &'worker AtomicUsize,
316 matches_count: &'worker AtomicUsize,
317
318 confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
319 get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
320 publish_matches: &'worker Sender<SearchResult>,
321}
322
323struct LimitReached;
324
325impl RequestHandler<'_> {
326 async fn handle_find_all_matches(
327 &self,
328 (buffer, snapshot): (Entity<Buffer>, BufferSnapshot),
329 ) -> Option<LimitReached> {
330 let ranges = self
331 .query
332 .search(&snapshot, None)
333 .await
334 .iter()
335 .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
336 .collect::<Vec<_>>();
337
338 let matched_ranges = ranges.len();
339 if self.matched_buffer_count.fetch_add(1, Ordering::Release) > MAX_SEARCH_RESULT_FILES
340 || self
341 .matches_count
342 .fetch_add(matched_ranges, Ordering::Release)
343 > MAX_SEARCH_RESULT_RANGES
344 {
345 Some(LimitReached)
346 } else {
347 _ = self
348 .publish_matches
349 .send(SearchResult::Buffer { buffer, ranges })
350 .await;
351 None
352 }
353 }
354 async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
355 _=maybe!(async move {
356 let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
357 let Some(file) = self.fs.open_sync(&abs_path).await.log_err() else {
358 return anyhow::Ok(());
359 };
360
361 let mut file = BufReader::new(file);
362 let file_start = file.fill_buf()?;
363
364 if let Err(Some(starting_position)) =
365 std::str::from_utf8(file_start).map_err(|e| e.error_len())
366 {
367 // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
368 // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
369 log::debug!(
370 "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
371 );
372 return Ok(());
373 }
374
375 if self.query.detect(file).unwrap_or(false) {
376 // Yes, we should scan the whole file.
377 entry.should_scan_tx.send(entry.path).await?;
378 }
379 Ok(())
380 }).await;
381 }
382
383 async fn handle_scan_path(&self, req: InputPath<'_>) {
384 _ = maybe!(async move {
385 let InputPath {
386 entry,
387 settings,
388 snapshot,
389 should_scan_tx,
390 } = req;
391 if entry.is_dir() && entry.is_ignored {
392 if !settings.is_path_excluded(&entry.path) {
393 // Self::scan_ignored_dir(
394 // self.fs,
395 // &snapshot,
396 // &entry.path,
397 // self.query,
398 // &filter_tx,
399 // &output_tx,
400 // )
401 // .await?;
402 }
403 return Ok(());
404 }
405
406 if entry.is_fifo || !entry.is_file() {
407 return Ok(());
408 }
409
410 if self.query.filters_path() {
411 let matched_path = if self.query.match_full_paths() {
412 let mut full_path = snapshot.root_name().as_std_path().to_owned();
413 full_path.push(entry.path.as_std_path());
414 self.query.match_path(&full_path)
415 } else {
416 self.query.match_path(entry.path.as_std_path())
417 };
418 if !matched_path {
419 return Ok(());
420 }
421 }
422
423 if self.open_entries.contains(&entry.id) {
424 // The buffer is already in memory and that's the version we want to scan;
425 // hence skip the dilly-dally and look for all matches straight away.
426 self.get_buffer_for_full_scan_tx
427 .send(ProjectPath {
428 worktree_id: snapshot.id(),
429 path: entry.path.clone(),
430 })
431 .await?;
432 } else {
433 self.confirm_contents_will_match_tx
434 .send(MatchingEntry {
435 should_scan_tx: should_scan_tx,
436 worktree_root: snapshot.abs_path().clone(),
437 path: ProjectPath {
438 worktree_id: snapshot.id(),
439 path: entry.path.clone(),
440 },
441 })
442 .await?;
443 }
444
445 anyhow::Ok(())
446 })
447 .await;
448 }
449}
450
451struct InputPath<'worker> {
452 entry: &'worker Entry,
453 settings: &'worker WorktreeSettings,
454 snapshot: &'worker Snapshot,
455 should_scan_tx: oneshot::Sender<ProjectPath>,
456}
457
458struct MatchingEntry {
459 worktree_root: Arc<Path>,
460 path: ProjectPath,
461 should_scan_tx: oneshot::Sender<ProjectPath>,
462}