1use std::{
2 io::{BufRead, BufReader},
3 path::Path,
4 pin::pin,
5 sync::{
6 Arc,
7 atomic::{AtomicUsize, Ordering},
8 },
9};
10
11use collections::HashSet;
12use fs::Fs;
13use futures::{SinkExt, StreamExt, select_biased};
14use gpui::{App, AsyncApp, Entity, WeakEntity};
15use language::{Buffer, BufferSnapshot};
16use postage::oneshot;
17use smol::channel::{Receiver, Sender, bounded, unbounded};
18
19use util::{ResultExt, maybe};
20use worktree::{Entry, ProjectEntryId, Snapshot, WorktreeSettings};
21
22use crate::{
23 ProjectPath,
24 buffer_store::BufferStore,
25 search::{SearchQuery, SearchResult},
26};
27
28pub(crate) struct ProjectSearcher {
29 fs: Arc<dyn Fs>,
30 buffer_store: WeakEntity<BufferStore>,
31 pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>,
32 open_buffers: HashSet<ProjectEntryId>,
33}
34
35const MAX_SEARCH_RESULT_FILES: usize = 5_000;
36const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
37
38impl ProjectSearcher {
39 pub(crate) fn search(self, query: SearchQuery, cx: &mut App) -> Receiver<SearchResult> {
40 let executor = cx.background_executor().clone();
41 let (tx, rx) = unbounded();
42 cx.spawn(async move |cx| {
43 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
44 let (find_all_matches_tx, find_all_matches_rx) = bounded(MAX_CONCURRENT_BUFFER_OPENS);
45 let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
46 bounded(MAX_CONCURRENT_BUFFER_OPENS);
47 let matches_count = AtomicUsize::new(0);
48 let matched_buffer_count = AtomicUsize::new(0);
49 let worker_pool = executor.scoped(|scope| {
50 let (input_paths_tx, input_paths_rx) = bounded(64);
51 let (find_first_match_tx, find_first_match_rx) = bounded(64);
52 let (sorted_search_results_tx, sorted_search_results_rx) = bounded(64);
53 for _ in 0..executor.num_cpus() {
54 let worker = Worker {
55 query: &query,
56 open_buffers: &self.open_buffers,
57 matched_buffer_count: &matched_buffer_count,
58 matches_count: &matches_count,
59 fs: &*self.fs,
60 input_paths_rx: input_paths_rx.clone(),
61 find_first_match_rx: find_first_match_rx.clone(),
62 find_first_match_tx: find_first_match_tx.clone(),
63 get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(),
64 find_all_matches_rx: find_all_matches_rx.clone(),
65 publish_matches: tx.clone(),
66 };
67 scope.spawn(worker.run());
68 }
69 scope.spawn(self.provide_search_paths(
70 &query,
71 input_paths_tx,
72 sorted_search_results_tx,
73 ));
74 scope.spawn(self.maintain_sorted_search_results())
75 });
76 self.open_buffers(get_buffer_for_full_scan_rx, find_all_matches_tx, cx)
77 .await;
78 worker_pool.await;
79 let limit_reached = matches_count.load(Ordering::Release) > MAX_SEARCH_RESULT_RANGES
80 || matched_buffer_count.load(Ordering::Release) > MAX_SEARCH_RESULT_FILES;
81 if limit_reached {
82 _ = tx.send(SearchResult::LimitReached).await;
83 }
84 })
85 .detach();
86 rx
87 }
88
89 async fn provide_search_paths<'this>(
90 &'this self,
91 query: &SearchQuery,
92 tx: Sender<InputPath<'this>>,
93 results: Sender<oneshot::Receiver<ProjectPath>>,
94 ) {
95 for (snapshot, worktree_settings) in &self.snapshots {
96 for entry in snapshot.entries(query.include_ignored(), 0) {
97 let (should_scan_tx, should_scan_rx) = oneshot::channel();
98 let Ok(_) = tx
99 .send(InputPath {
100 entry,
101 settings: worktree_settings,
102 snapshot: snapshot,
103 should_scan_tx,
104 })
105 .await
106 else {
107 return;
108 };
109 results.send(should_scan_rx).await;
110 }
111 }
112 }
113
114 async fn maintain_sorted_search_results(
115 &self,
116 rx: Receiver<oneshot::Receiver<ProjectPath>>,
117 paths_for_full_scan: Sender<ProjectPath>,
118 ) {
119 let mut rx = pin!(rx);
120 while let Some(mut next_path_result) = rx.next().await {
121 let Some(successful_path) = next_path_result.next().await else {
122 // This math did not produce a match, hence skip it.
123 continue;
124 };
125 paths_for_full_scan.send(successful_path).await;
126 }
127 }
128
129 /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
130 async fn open_buffers<'a>(
131 &'a self,
132 rx: Receiver<ProjectPath>,
133 find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
134 cx: &mut AsyncApp,
135 ) {
136 _ = maybe!(async move {
137 while let Ok(requested_path) = rx.recv().await {
138 let Some(buffer) = self
139 .buffer_store
140 .update(cx, |this, cx| this.open_buffer(requested_path, cx))?
141 .await
142 .log_err()
143 else {
144 continue;
145 };
146 let snapshot = buffer.read_with(cx, |this, _| this.snapshot())?;
147 find_all_matches_tx.send((buffer, snapshot)).await?;
148 }
149 Result::<_, anyhow::Error>::Ok(())
150 })
151 .await;
152 }
153}
154
155struct Worker<'search> {
156 query: &'search SearchQuery,
157 matched_buffer_count: &'search AtomicUsize,
158 matches_count: &'search AtomicUsize,
159 open_buffers: &'search HashSet<ProjectEntryId>,
160 fs: &'search dyn Fs,
161 /// Start off with all paths in project and filter them based on:
162 /// - Include filters
163 /// - Exclude filters
164 /// - Only open buffers
165 /// - Scan ignored files
166 /// Put another way: filter out files that can't match (without looking at file contents)
167 input_paths_rx: Receiver<InputPath<'search>>,
168 /// After that, figure out which paths contain at least one match (look at file contents). That's called "partial scan".
169 find_first_match_tx: Sender<MatchingEntry>,
170 find_first_match_rx: Receiver<MatchingEntry>,
171 /// Of those that contain at least one match, look for rest of matches (and figure out their ranges).
172 /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
173 get_buffer_for_full_scan_tx: Sender<ProjectPath>,
174 /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
175 find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
176 /// Cool, we have results; let's share them with the world.
177 publish_matches: Sender<SearchResult>,
178}
179
180impl Worker<'_> {
181 async fn run(self) {
182 let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
183 let mut find_first_match = pin!(self.find_first_match_rx.fuse());
184 let mut scan_path = pin!(self.input_paths_rx.fuse());
185 let handler = RequestHandler {
186 query: self.query,
187 open_entries: &self.open_buffers,
188 fs: self.fs,
189 matched_buffer_count: self.matched_buffer_count,
190 matches_count: self.matches_count,
191 find_first_match_tx: &self.find_first_match_tx,
192 get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx,
193 publish_matches: &self.publish_matches,
194 };
195 loop {
196 select_biased! {
197 find_all_matches = find_all_matches.next() => {
198 let result = handler.handle_find_all_matches(find_all_matches).await;
199 if let Some(_should_bail) = result {
200 return;
201 }
202 },
203 find_first_match = find_first_match.next() => {
204 if let Some(buffer_with_at_least_one_match) = find_first_match {
205 handler.handle_find_first_match(buffer_with_at_least_one_match);
206 }
207
208 },
209 scan_path = scan_path.next() => {
210 if let Some(path_to_scan) = scan_path {
211 handler.handle_scan_path(path_to_scan).await;
212 }
213
214 }
215 complete => break,
216 }
217 }
218 }
219}
220
221struct RequestHandler<'worker> {
222 query: &'worker SearchQuery,
223 fs: &'worker dyn Fs,
224 open_entries: &'worker HashSet<ProjectEntryId>,
225 matched_buffer_count: &'worker AtomicUsize,
226 matches_count: &'worker AtomicUsize,
227
228 find_first_match_tx: &'worker Sender<MatchingEntry>,
229 get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
230 publish_matches: &'worker Sender<SearchResult>,
231}
232
233struct LimitReached;
234
235impl RequestHandler<'_> {
236 async fn handle_find_all_matches(
237 &self,
238 req: Option<(Entity<Buffer>, BufferSnapshot)>,
239 ) -> Option<LimitReached> {
240 let Some((buffer, snapshot)) = req else {
241 unreachable!()
242 };
243 let ranges = self
244 .query
245 .search(&snapshot, None)
246 .await
247 .iter()
248 .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
249 .collect::<Vec<_>>();
250
251 let matched_ranges = ranges.len();
252 if self.matched_buffer_count.fetch_add(1, Ordering::Release) > MAX_SEARCH_RESULT_FILES
253 || self
254 .matches_count
255 .fetch_add(matched_ranges, Ordering::Release)
256 > MAX_SEARCH_RESULT_RANGES
257 {
258 Some(LimitReached)
259 } else {
260 self.publish_matches
261 .send(SearchResult::Buffer { buffer, ranges })
262 .await;
263 None
264 }
265 }
266 async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
267 _=maybe!(async move {
268 let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
269 let Some(file) = self.fs.open_sync(&abs_path).await.log_err() else {
270 return anyhow::Ok(());
271 };
272
273 let mut file = BufReader::new(file);
274 let file_start = file.fill_buf()?;
275
276 if let Err(Some(starting_position)) =
277 std::str::from_utf8(file_start).map_err(|e| e.error_len())
278 {
279 // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
280 // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
281 log::debug!(
282 "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
283 );
284 return Ok(());
285 }
286
287 if self.query.detect(file).unwrap_or(false) {
288 // Yes, we should scan the whole file.
289 entry.should_scan_tx.send(entry.path).await?;
290 }
291 Ok(())
292 }).await;
293 }
294
295 async fn handle_scan_path(&self, req: InputPath<'_>) {
296 _ = maybe!(async move {
297 let InputPath {
298 entry,
299 settings,
300 snapshot,
301 should_scan_tx,
302 } = req;
303 if entry.is_dir() && entry.is_ignored {
304 if !settings.is_path_excluded(&entry.path) {
305 Self::scan_ignored_dir(
306 self.fs,
307 &snapshot,
308 &entry.path,
309 self.query,
310 &filter_tx,
311 &output_tx,
312 )
313 .await?;
314 }
315 return Ok(());
316 }
317
318 if entry.is_fifo || !entry.is_file() {
319 return Ok(());
320 }
321
322 if self.query.filters_path() {
323 let matched_path = if self.query.match_full_paths() {
324 let mut full_path = snapshot.root_name().as_std_path().to_owned();
325 full_path.push(entry.path.as_std_path());
326 self.query.match_path(&full_path)
327 } else {
328 self.query.match_path(entry.path.as_std_path())
329 };
330 if !matched_path {
331 return Ok(());
332 }
333 }
334
335 let (mut tx, rx) = oneshot::channel();
336
337 if self.open_entries.contains(&entry.id) {
338 // The buffer is already in memory and that's the version we want to scan;
339 // hence skip the dilly-dally and look for all matches straight away.
340 self.get_buffer_for_full_scan_tx
341 .send(ProjectPath {
342 worktree_id: snapshot.id(),
343 path: entry.path.clone(),
344 })
345 .await?;
346 } else {
347 self.find_first_match_tx
348 .send(MatchingEntry {
349 should_scan_tx: should_scan_tx,
350 worktree_root: snapshot.abs_path().clone(),
351 path: ProjectPath {
352 worktree_id: snapshot.id(),
353 path: entry.path.clone(),
354 },
355 })
356 .await?;
357 }
358
359 output_tx.send(rx).await?;
360 anyhow::Ok(())
361 })
362 .await;
363 }
364}
365
366struct InputPath<'worker> {
367 entry: &'worker Entry,
368 settings: &'worker WorktreeSettings,
369 snapshot: &'worker Snapshot,
370 should_scan_tx: oneshot::Sender<ProjectPath>,
371}
372
373struct MatchingEntry {
374 worktree_root: Arc<Path>,
375 path: ProjectPath,
376 should_scan_tx: oneshot::Sender<ProjectPath>,
377}