1use std::{
2 io::{BufRead, BufReader},
3 path::Path,
4 pin::pin,
5 sync::{
6 Arc,
7 atomic::{AtomicUsize, Ordering},
8 },
9};
10
11use collections::HashSet;
12use fs::Fs;
13use futures::{SinkExt, StreamExt, select_biased};
14use gpui::{App, AppContext, AsyncApp, Entity, Task};
15use language::{Buffer, BufferSnapshot};
16use postage::oneshot;
17use smol::channel::{Receiver, Sender, bounded, unbounded};
18
19use util::{ResultExt, maybe};
20use worktree::{Entry, ProjectEntryId, Snapshot, WorktreeSettings};
21
22use crate::{
23 ProjectItem, ProjectPath,
24 buffer_store::BufferStore,
25 search::{SearchQuery, SearchResult},
26};
27
28pub(crate) struct Search {
29 pub(crate) fs: Arc<dyn Fs>,
30 pub(crate) buffer_store: Entity<BufferStore>,
31 pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>,
32 pub(crate) open_buffers: HashSet<ProjectEntryId>,
33}
34
35const MAX_SEARCH_RESULT_FILES: usize = 5_000;
36const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
37
38/// Represents results of project search and allows one to either obtain match positions OR
39/// just the handles to buffers that may match the search.
40#[must_use]
41pub(crate) struct SearchResultsHandle {
42 results: Receiver<SearchResult>,
43 matching_buffers: Receiver<Entity<Buffer>>,
44 trigger_search: Box<dyn FnOnce(&mut App) -> Task<()> + Send + Sync>,
45}
46
47impl SearchResultsHandle {
48 pub(crate) fn results(self, cx: &mut App) -> Receiver<SearchResult> {
49 (self.trigger_search)(cx).detach();
50 self.results
51 }
52 pub(crate) fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
53 (self.trigger_search)(cx).detach();
54 self.matching_buffers
55 }
56}
57
58impl Search {
59 /// Prepares a project search run. The result has to be used to specify whether you're interested in matching buffers
60 /// or full search results.
61 pub(crate) fn into_results(self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle {
62 let mut open_buffers = HashSet::default();
63 let mut unnamed_buffers = Vec::new();
64 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
65 let buffers = self.buffer_store.read(cx);
66 for handle in buffers.buffers() {
67 let buffer = handle.read(cx);
68 if !buffers.is_searchable(&buffer.remote_id()) {
69 continue;
70 } else if let Some(entry_id) = buffer.entry_id(cx) {
71 open_buffers.insert(entry_id);
72 } else {
73 // limit = limit.saturating_sub(1); todo!()
74 unnamed_buffers.push(handle)
75 };
76 }
77 let executor = cx.background_executor().clone();
78 let (tx, rx) = unbounded();
79 let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
80 let matching_buffers = grab_buffer_snapshot_rx.clone();
81 let trigger_search = Box::new(|cx: &mut App| {
82 cx.spawn(async move |cx| {
83 for buffer in unnamed_buffers {
84 _ = grab_buffer_snapshot_tx.send(buffer).await;
85 }
86
87 let (find_all_matches_tx, find_all_matches_rx) =
88 bounded(MAX_CONCURRENT_BUFFER_OPENS);
89
90 let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) = unbounded();
91 let matches_count = AtomicUsize::new(0);
92 let matched_buffer_count = AtomicUsize::new(0);
93 let worker_pool = executor.scoped(|scope| {
94 let (input_paths_tx, input_paths_rx) = bounded(64);
95 let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) =
96 bounded(64);
97 let (sorted_search_results_tx, sorted_search_results_rx) = bounded(64);
98 for _ in 0..executor.num_cpus() {
99 let worker = Worker {
100 query: &query,
101 open_buffers: &self.open_buffers,
102 matched_buffer_count: &matched_buffer_count,
103 matches_count: &matches_count,
104 fs: &*self.fs,
105 input_paths_rx: input_paths_rx.clone(),
106 confirm_contents_will_match_rx: confirm_contents_will_match_rx.clone(),
107 confirm_contents_will_match_tx: confirm_contents_will_match_tx.clone(),
108 get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(),
109 find_all_matches_rx: find_all_matches_rx.clone(),
110 publish_matches: tx.clone(),
111 };
112 scope.spawn(worker.run());
113 }
114 scope.spawn(self.provide_search_paths(
115 &query,
116 input_paths_tx,
117 sorted_search_results_tx,
118 ));
119 scope.spawn(self.maintain_sorted_search_results(
120 sorted_search_results_rx,
121 get_buffer_for_full_scan_tx,
122 ))
123 });
124 let open_buffers = self.open_buffers(
125 get_buffer_for_full_scan_rx,
126 grab_buffer_snapshot_tx,
127 cx.clone(),
128 );
129 let buffer_snapshots = self.grab_buffer_snapshots(
130 grab_buffer_snapshot_rx,
131 find_all_matches_tx,
132 cx.clone(),
133 );
134 futures::future::join3(worker_pool, buffer_snapshots, open_buffers).await;
135
136 let limit_reached = matches_count.load(Ordering::Acquire)
137 > MAX_SEARCH_RESULT_RANGES
138 || matched_buffer_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_FILES;
139 if limit_reached {
140 _ = tx.send(SearchResult::LimitReached).await;
141 }
142 })
143 });
144 SearchResultsHandle {
145 results: rx,
146 matching_buffers,
147 trigger_search,
148 }
149 }
150
151 async fn provide_search_paths<'this>(
152 &'this self,
153 query: &SearchQuery,
154 tx: Sender<InputPath<'this>>,
155 results: Sender<oneshot::Receiver<ProjectPath>>,
156 ) {
157 for (snapshot, worktree_settings) in &self.snapshots {
158 for entry in snapshot.entries(query.include_ignored(), 0) {
159 let (should_scan_tx, should_scan_rx) = oneshot::channel();
160 let Ok(_) = tx
161 .send(InputPath {
162 entry,
163 settings: worktree_settings,
164 snapshot: snapshot,
165 should_scan_tx,
166 })
167 .await
168 else {
169 return;
170 };
171 if results.send(should_scan_rx).await.is_err() {
172 return;
173 };
174 }
175 }
176 }
177
178 async fn maintain_sorted_search_results(
179 &self,
180 rx: Receiver<oneshot::Receiver<ProjectPath>>,
181 paths_for_full_scan: Sender<ProjectPath>,
182 ) {
183 let mut rx = pin!(rx);
184 while let Some(mut next_path_result) = rx.next().await {
185 let Some(successful_path) = next_path_result.next().await else {
186 // This math did not produce a match, hence skip it.
187 continue;
188 };
189 if paths_for_full_scan.send(successful_path).await.is_err() {
190 return;
191 };
192 }
193 }
194
195 /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
196 async fn open_buffers<'a>(
197 &'a self,
198 rx: Receiver<ProjectPath>,
199 find_all_matches_tx: Sender<Entity<Buffer>>,
200 mut cx: AsyncApp,
201 ) {
202 _ = maybe!(async move {
203 while let Ok(requested_path) = rx.recv().await {
204 let Some(buffer) = self
205 .buffer_store
206 .update(&mut cx, |this, cx| this.open_buffer(requested_path, cx))?
207 .await
208 .log_err()
209 else {
210 continue;
211 };
212 find_all_matches_tx.send(buffer).await?;
213 }
214 Result::<_, anyhow::Error>::Ok(())
215 })
216 .await;
217 }
218
219 async fn grab_buffer_snapshots<'a>(
220 &'a self,
221 rx: Receiver<Entity<Buffer>>,
222 find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
223 mut cx: AsyncApp,
224 ) {
225 _ = maybe!(async move {
226 while let Ok(buffer) = rx.recv().await {
227 let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?;
228 find_all_matches_tx.send((buffer, snapshot)).await?;
229 }
230 Result::<_, anyhow::Error>::Ok(())
231 })
232 .await;
233 }
234}
235
236struct Worker<'search> {
237 query: &'search SearchQuery,
238 matched_buffer_count: &'search AtomicUsize,
239 matches_count: &'search AtomicUsize,
240 open_buffers: &'search HashSet<ProjectEntryId>,
241 fs: &'search dyn Fs,
242 /// Start off with all paths in project and filter them based on:
243 /// - Include filters
244 /// - Exclude filters
245 /// - Only open buffers
246 /// - Scan ignored files
247 /// Put another way: filter out files that can't match (without looking at file contents)
248 input_paths_rx: Receiver<InputPath<'search>>,
249
250 /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
251 /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
252 confirm_contents_will_match_tx: Sender<MatchingEntry>,
253 confirm_contents_will_match_rx: Receiver<MatchingEntry>,
254 /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
255 /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
256 get_buffer_for_full_scan_tx: Sender<ProjectPath>,
257 /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
258 find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
259 /// Cool, we have results; let's share them with the world.
260 publish_matches: Sender<SearchResult>,
261}
262
263impl Worker<'_> {
264 async fn run(self) {
265 let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
266 let mut find_first_match = pin!(self.confirm_contents_will_match_rx.fuse());
267 let mut scan_path = pin!(self.input_paths_rx.fuse());
268 let handler = RequestHandler {
269 query: self.query,
270 open_entries: &self.open_buffers,
271 fs: self.fs,
272 matched_buffer_count: self.matched_buffer_count,
273 matches_count: self.matches_count,
274 confirm_contents_will_match_tx: &self.confirm_contents_will_match_tx,
275 get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx,
276 publish_matches: &self.publish_matches,
277 };
278
279 loop {
280 select_biased! {
281 find_all_matches = find_all_matches.next() => {
282 let Some(matches) = find_all_matches else {
283 self.publish_matches.close();
284 continue;
285 };
286 let result = handler.handle_find_all_matches(matches).await;
287 if let Some(_should_bail) = result {
288 self.publish_matches.close();
289 break;
290 }
291 },
292 find_first_match = find_first_match.next() => {
293 if let Some(buffer_with_at_least_one_match) = find_first_match {
294 handler.handle_find_first_match(buffer_with_at_least_one_match).await;
295 } else {
296 self.get_buffer_for_full_scan_tx.close();
297 }
298
299 },
300 scan_path = scan_path.next() => {
301 if let Some(path_to_scan) = scan_path {
302 handler.handle_scan_path(path_to_scan).await;
303 } else {
304 self.confirm_contents_will_match_tx.close();
305 }
306
307 }
308 complete => break,
309 }
310 }
311 }
312}
313
314struct RequestHandler<'worker> {
315 query: &'worker SearchQuery,
316 fs: &'worker dyn Fs,
317 open_entries: &'worker HashSet<ProjectEntryId>,
318 matched_buffer_count: &'worker AtomicUsize,
319 matches_count: &'worker AtomicUsize,
320
321 confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
322 get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
323 publish_matches: &'worker Sender<SearchResult>,
324}
325
326struct LimitReached;
327
328impl RequestHandler<'_> {
329 async fn handle_find_all_matches(
330 &self,
331 (buffer, snapshot): (Entity<Buffer>, BufferSnapshot),
332 ) -> Option<LimitReached> {
333 let ranges = self
334 .query
335 .search(&snapshot, None)
336 .await
337 .iter()
338 .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
339 .collect::<Vec<_>>();
340
341 let matched_ranges = ranges.len();
342 if self.matched_buffer_count.fetch_add(1, Ordering::Release) > MAX_SEARCH_RESULT_FILES
343 || self
344 .matches_count
345 .fetch_add(matched_ranges, Ordering::Release)
346 > MAX_SEARCH_RESULT_RANGES
347 {
348 Some(LimitReached)
349 } else {
350 _ = self
351 .publish_matches
352 .send(SearchResult::Buffer { buffer, ranges })
353 .await;
354 None
355 }
356 }
357 async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
358 _=maybe!(async move {
359 let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
360 let Some(file) = self.fs.open_sync(&abs_path).await.log_err() else {
361 return anyhow::Ok(());
362 };
363
364 let mut file = BufReader::new(file);
365 let file_start = file.fill_buf()?;
366
367 if let Err(Some(starting_position)) =
368 std::str::from_utf8(file_start).map_err(|e| e.error_len())
369 {
370 // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
371 // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
372 log::debug!(
373 "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
374 );
375 return Ok(());
376 }
377
378 if self.query.detect(file).unwrap_or(false) {
379 // Yes, we should scan the whole file.
380 entry.should_scan_tx.send(entry.path).await?;
381 }
382 Ok(())
383 }).await;
384 }
385
386 async fn handle_scan_path(&self, req: InputPath<'_>) {
387 _ = maybe!(async move {
388 let InputPath {
389 entry,
390 settings,
391 snapshot,
392 should_scan_tx,
393 } = req;
394 if entry.is_dir() && entry.is_ignored {
395 if !settings.is_path_excluded(&entry.path) {
396 // Self::scan_ignored_dir(
397 // self.fs,
398 // &snapshot,
399 // &entry.path,
400 // self.query,
401 // &filter_tx,
402 // &output_tx,
403 // )
404 // .await?;
405 }
406 return Ok(());
407 }
408
409 if entry.is_fifo || !entry.is_file() {
410 return Ok(());
411 }
412
413 if self.query.filters_path() {
414 let matched_path = if self.query.match_full_paths() {
415 let mut full_path = snapshot.root_name().as_std_path().to_owned();
416 full_path.push(entry.path.as_std_path());
417 self.query.match_path(&full_path)
418 } else {
419 self.query.match_path(entry.path.as_std_path())
420 };
421 if !matched_path {
422 return Ok(());
423 }
424 }
425
426 if self.open_entries.contains(&entry.id) {
427 // The buffer is already in memory and that's the version we want to scan;
428 // hence skip the dilly-dally and look for all matches straight away.
429 self.get_buffer_for_full_scan_tx
430 .send(ProjectPath {
431 worktree_id: snapshot.id(),
432 path: entry.path.clone(),
433 })
434 .await?;
435 } else {
436 self.confirm_contents_will_match_tx
437 .send(MatchingEntry {
438 should_scan_tx: should_scan_tx,
439 worktree_root: snapshot.abs_path().clone(),
440 path: ProjectPath {
441 worktree_id: snapshot.id(),
442 path: entry.path.clone(),
443 },
444 })
445 .await?;
446 }
447
448 anyhow::Ok(())
449 })
450 .await;
451 }
452}
453
454struct InputPath<'worker> {
455 entry: &'worker Entry,
456 settings: &'worker WorktreeSettings,
457 snapshot: &'worker Snapshot,
458 should_scan_tx: oneshot::Sender<ProjectPath>,
459}
460
461struct MatchingEntry {
462 worktree_root: Arc<Path>,
463 path: ProjectPath,
464 should_scan_tx: oneshot::Sender<ProjectPath>,
465}