1use std::{
2 io::{BufRead, BufReader},
3 path::Path,
4 pin::pin,
5 sync::{
6 Arc,
7 atomic::{AtomicUsize, Ordering},
8 },
9};
10
11use collections::HashSet;
12use fs::Fs;
13use futures::{SinkExt, StreamExt, select_biased};
14use gpui::{App, AsyncApp, Entity, WeakEntity};
15use language::{Buffer, BufferSnapshot};
16use postage::oneshot;
17use smol::channel::{Receiver, Sender, bounded, unbounded};
18
19use util::{ResultExt, maybe};
20use worktree::{Entry, ProjectEntryId, Snapshot, WorktreeSettings};
21
22use crate::{
23 ProjectPath,
24 buffer_store::BufferStore,
25 search::{SearchQuery, SearchResult},
26};
27
28pub(crate) struct ProjectSearcher {
29 pub(crate) fs: Arc<dyn Fs>,
30 pub(crate) buffer_store: WeakEntity<BufferStore>,
31 pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>,
32 pub(crate) open_buffers: HashSet<ProjectEntryId>,
33}
34
35const MAX_SEARCH_RESULT_FILES: usize = 5_000;
36const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
37
38impl ProjectSearcher {
39 pub(crate) fn run(self, query: SearchQuery, cx: &mut App) -> Receiver<SearchResult> {
40 let executor = cx.background_executor().clone();
41 let (tx, rx) = unbounded();
42 cx.spawn(async move |cx| {
43 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
44 let (find_all_matches_tx, find_all_matches_rx) = bounded(MAX_CONCURRENT_BUFFER_OPENS);
45 let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
46 bounded(MAX_CONCURRENT_BUFFER_OPENS);
47 let matches_count = AtomicUsize::new(0);
48 let matched_buffer_count = AtomicUsize::new(0);
49 let worker_pool = executor.scoped(|scope| {
50 let (input_paths_tx, input_paths_rx) = bounded(64);
51 let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) = bounded(64);
52 let (sorted_search_results_tx, sorted_search_results_rx) = bounded(64);
53 for _ in 0..executor.num_cpus() {
54 let worker = Worker {
55 query: &query,
56 open_buffers: &self.open_buffers,
57 matched_buffer_count: &matched_buffer_count,
58 matches_count: &matches_count,
59 fs: &*self.fs,
60 input_paths_rx: input_paths_rx.clone(),
61 confirm_contents_will_match_rx: confirm_contents_will_match_rx.clone(),
62 confirm_contents_will_match_tx: confirm_contents_will_match_tx.clone(),
63 get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(),
64 find_all_matches_rx: find_all_matches_rx.clone(),
65 publish_matches: tx.clone(),
66 };
67 scope.spawn(worker.run());
68 }
69 scope.spawn(self.provide_search_paths(
70 &query,
71 input_paths_tx,
72 sorted_search_results_tx,
73 ));
74 scope.spawn(self.maintain_sorted_search_results(
75 sorted_search_results_rx,
76 get_buffer_for_full_scan_tx,
77 ))
78 });
79 let open_buffers =
80 self.open_buffers(get_buffer_for_full_scan_rx, find_all_matches_tx, cx);
81 futures::future::join(worker_pool, open_buffers).await;
82
83 let limit_reached = matches_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_RANGES
84 || matched_buffer_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_FILES;
85 if limit_reached {
86 _ = tx.send(SearchResult::LimitReached).await;
87 }
88 })
89 .detach();
90 rx
91 }
92
93 async fn provide_search_paths<'this>(
94 &'this self,
95 query: &SearchQuery,
96 tx: Sender<InputPath<'this>>,
97 results: Sender<oneshot::Receiver<ProjectPath>>,
98 ) {
99 for (snapshot, worktree_settings) in &self.snapshots {
100 for entry in snapshot.entries(query.include_ignored(), 0) {
101 let (should_scan_tx, should_scan_rx) = oneshot::channel();
102 let Ok(_) = tx
103 .send(InputPath {
104 entry,
105 settings: worktree_settings,
106 snapshot: snapshot,
107 should_scan_tx,
108 })
109 .await
110 else {
111 return;
112 };
113 if results.send(should_scan_rx).await.is_err() {
114 return;
115 };
116 }
117 }
118 }
119
120 async fn maintain_sorted_search_results(
121 &self,
122 rx: Receiver<oneshot::Receiver<ProjectPath>>,
123 paths_for_full_scan: Sender<ProjectPath>,
124 ) {
125 let mut rx = pin!(rx);
126 while let Some(mut next_path_result) = rx.next().await {
127 let Some(successful_path) = next_path_result.next().await else {
128 // This math did not produce a match, hence skip it.
129 continue;
130 };
131 if paths_for_full_scan.send(successful_path).await.is_err() {
132 return;
133 };
134 }
135 }
136
137 /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
138 async fn open_buffers<'a>(
139 &'a self,
140 rx: Receiver<ProjectPath>,
141 find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
142 cx: &mut AsyncApp,
143 ) {
144 _ = maybe!(async move {
145 while let Ok(requested_path) = rx.recv().await {
146 let Some(buffer) = self
147 .buffer_store
148 .update(cx, |this, cx| this.open_buffer(requested_path, cx))?
149 .await
150 .log_err()
151 else {
152 continue;
153 };
154 let snapshot = buffer.read_with(cx, |this, _| this.snapshot())?;
155 find_all_matches_tx.send((buffer, snapshot)).await?;
156 }
157 Result::<_, anyhow::Error>::Ok(())
158 })
159 .await;
160 }
161}
162
163struct Worker<'search> {
164 query: &'search SearchQuery,
165 matched_buffer_count: &'search AtomicUsize,
166 matches_count: &'search AtomicUsize,
167 open_buffers: &'search HashSet<ProjectEntryId>,
168 fs: &'search dyn Fs,
169 /// Start off with all paths in project and filter them based on:
170 /// - Include filters
171 /// - Exclude filters
172 /// - Only open buffers
173 /// - Scan ignored files
174 /// Put another way: filter out files that can't match (without looking at file contents)
175 input_paths_rx: Receiver<InputPath<'search>>,
176
177 /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
178 /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
179 confirm_contents_will_match_tx: Sender<MatchingEntry>,
180 confirm_contents_will_match_rx: Receiver<MatchingEntry>,
181 /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
182 /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
183 get_buffer_for_full_scan_tx: Sender<ProjectPath>,
184 /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
185 find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
186 /// Cool, we have results; let's share them with the world.
187 publish_matches: Sender<SearchResult>,
188}
189
190impl Worker<'_> {
191 async fn run(self) {
192 let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
193 let mut find_first_match = pin!(self.confirm_contents_will_match_rx.fuse());
194 let mut scan_path = pin!(self.input_paths_rx.fuse());
195 let handler = RequestHandler {
196 query: self.query,
197 open_entries: &self.open_buffers,
198 fs: self.fs,
199 matched_buffer_count: self.matched_buffer_count,
200 matches_count: self.matches_count,
201 confirm_contents_will_match_tx: &self.confirm_contents_will_match_tx,
202 get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx,
203 publish_matches: &self.publish_matches,
204 };
205 loop {
206 select_biased! {
207 find_all_matches = find_all_matches.next() => {
208 let result = handler.handle_find_all_matches(find_all_matches).await;
209 if let Some(_should_bail) = result {
210 return;
211 }
212 },
213 find_first_match = find_first_match.next() => {
214 if let Some(buffer_with_at_least_one_match) = find_first_match {
215 handler.handle_find_first_match(buffer_with_at_least_one_match).await;
216 }
217
218 },
219 scan_path = scan_path.next() => {
220 if let Some(path_to_scan) = scan_path {
221 handler.handle_scan_path(path_to_scan).await;
222 }
223
224 }
225 complete => break,
226 }
227 }
228 }
229}
230
231struct RequestHandler<'worker> {
232 query: &'worker SearchQuery,
233 fs: &'worker dyn Fs,
234 open_entries: &'worker HashSet<ProjectEntryId>,
235 matched_buffer_count: &'worker AtomicUsize,
236 matches_count: &'worker AtomicUsize,
237
238 confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
239 get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
240 publish_matches: &'worker Sender<SearchResult>,
241}
242
243struct LimitReached;
244
245impl RequestHandler<'_> {
246 async fn handle_find_all_matches(
247 &self,
248 req: Option<(Entity<Buffer>, BufferSnapshot)>,
249 ) -> Option<LimitReached> {
250 let Some((buffer, snapshot)) = req else {
251 unreachable!()
252 };
253 let ranges = self
254 .query
255 .search(&snapshot, None)
256 .await
257 .iter()
258 .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
259 .collect::<Vec<_>>();
260
261 let matched_ranges = ranges.len();
262 if self.matched_buffer_count.fetch_add(1, Ordering::Release) > MAX_SEARCH_RESULT_FILES
263 || self
264 .matches_count
265 .fetch_add(matched_ranges, Ordering::Release)
266 > MAX_SEARCH_RESULT_RANGES
267 {
268 Some(LimitReached)
269 } else {
270 _ = self
271 .publish_matches
272 .send(SearchResult::Buffer { buffer, ranges })
273 .await;
274 None
275 }
276 }
277 async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
278 _=maybe!(async move {
279 let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
280 let Some(file) = self.fs.open_sync(&abs_path).await.log_err() else {
281 return anyhow::Ok(());
282 };
283
284 let mut file = BufReader::new(file);
285 let file_start = file.fill_buf()?;
286
287 if let Err(Some(starting_position)) =
288 std::str::from_utf8(file_start).map_err(|e| e.error_len())
289 {
290 // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
291 // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
292 log::debug!(
293 "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
294 );
295 return Ok(());
296 }
297
298 if self.query.detect(file).unwrap_or(false) {
299 // Yes, we should scan the whole file.
300 entry.should_scan_tx.send(entry.path).await?;
301 }
302 Ok(())
303 }).await;
304 }
305
306 async fn handle_scan_path(&self, req: InputPath<'_>) {
307 _ = maybe!(async move {
308 let InputPath {
309 entry,
310 settings,
311 snapshot,
312 should_scan_tx,
313 } = req;
314 if entry.is_dir() && entry.is_ignored {
315 if !settings.is_path_excluded(&entry.path) {
316 // Self::scan_ignored_dir(
317 // self.fs,
318 // &snapshot,
319 // &entry.path,
320 // self.query,
321 // &filter_tx,
322 // &output_tx,
323 // )
324 // .await?;
325 }
326 return Ok(());
327 }
328
329 if entry.is_fifo || !entry.is_file() {
330 return Ok(());
331 }
332
333 if self.query.filters_path() {
334 let matched_path = if self.query.match_full_paths() {
335 let mut full_path = snapshot.root_name().as_std_path().to_owned();
336 full_path.push(entry.path.as_std_path());
337 self.query.match_path(&full_path)
338 } else {
339 self.query.match_path(entry.path.as_std_path())
340 };
341 if !matched_path {
342 return Ok(());
343 }
344 }
345
346 if self.open_entries.contains(&entry.id) {
347 // The buffer is already in memory and that's the version we want to scan;
348 // hence skip the dilly-dally and look for all matches straight away.
349 self.get_buffer_for_full_scan_tx
350 .send(ProjectPath {
351 worktree_id: snapshot.id(),
352 path: entry.path.clone(),
353 })
354 .await?;
355 } else {
356 self.confirm_contents_will_match_tx
357 .send(MatchingEntry {
358 should_scan_tx: should_scan_tx,
359 worktree_root: snapshot.abs_path().clone(),
360 path: ProjectPath {
361 worktree_id: snapshot.id(),
362 path: entry.path.clone(),
363 },
364 })
365 .await?;
366 }
367
368 anyhow::Ok(())
369 })
370 .await;
371 }
372}
373
374struct InputPath<'worker> {
375 entry: &'worker Entry,
376 settings: &'worker WorktreeSettings,
377 snapshot: &'worker Snapshot,
378 should_scan_tx: oneshot::Sender<ProjectPath>,
379}
380
381struct MatchingEntry {
382 worktree_root: Arc<Path>,
383 path: ProjectPath,
384 should_scan_tx: oneshot::Sender<ProjectPath>,
385}