1use std::{
2 io::{BufRead, BufReader},
3 path::Path,
4 pin::pin,
5 sync::{
6 Arc,
7 atomic::{AtomicUsize, Ordering},
8 },
9};
10
11use collections::HashSet;
12use fs::Fs;
13use futures::{SinkExt, StreamExt, select_biased};
14use gpui::{App, AsyncApp, Entity};
15use language::{Buffer, BufferSnapshot};
16use postage::oneshot;
17use smol::channel::{Receiver, Sender, bounded, unbounded};
18
19use util::{ResultExt, maybe};
20use worktree::{Entry, ProjectEntryId, Snapshot, WorktreeSettings};
21
22use crate::{
23 ProjectItem, ProjectPath,
24 buffer_store::BufferStore,
25 search::{SearchQuery, SearchResult},
26};
27
28pub(crate) struct ProjectSearcher {
29 pub(crate) fs: Arc<dyn Fs>,
30 pub(crate) buffer_store: Entity<BufferStore>,
31 pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>,
32 pub(crate) open_buffers: HashSet<ProjectEntryId>,
33}
34
35const MAX_SEARCH_RESULT_FILES: usize = 5_000;
36const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
37
38impl ProjectSearcher {
39 pub(crate) fn run(self, query: SearchQuery, cx: &mut App) -> Receiver<SearchResult> {
40 let mut open_buffers = HashSet::default();
41 let mut unnamed_buffers = Vec::new();
42
43 let buffers = self.buffer_store.read(cx);
44 for handle in buffers.buffers() {
45 let buffer = handle.read(cx);
46 if !buffers.is_searchable(&buffer.remote_id()) {
47 continue;
48 } else if let Some(entry_id) = buffer.entry_id(cx) {
49 open_buffers.insert(entry_id);
50 } else {
51 // limit = limit.saturating_sub(1); todo!()
52 unnamed_buffers.push(handle)
53 };
54 }
55 let executor = cx.background_executor().clone();
56 let (tx, rx) = unbounded();
57 cx.spawn(async move |cx| {
58 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
59 let (find_all_matches_tx, find_all_matches_rx) = bounded(MAX_CONCURRENT_BUFFER_OPENS);
60 let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
61 bounded(MAX_CONCURRENT_BUFFER_OPENS);
62 let matches_count = AtomicUsize::new(0);
63 let matched_buffer_count = AtomicUsize::new(0);
64 let worker_pool = executor.scoped(|scope| {
65 let (input_paths_tx, input_paths_rx) = bounded(64);
66 let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) = bounded(64);
67 let (sorted_search_results_tx, sorted_search_results_rx) = bounded(64);
68 for _ in 0..executor.num_cpus() {
69 let worker = Worker {
70 query: &query,
71 open_buffers: &self.open_buffers,
72 matched_buffer_count: &matched_buffer_count,
73 matches_count: &matches_count,
74 fs: &*self.fs,
75 input_paths_rx: input_paths_rx.clone(),
76 confirm_contents_will_match_rx: confirm_contents_will_match_rx.clone(),
77 confirm_contents_will_match_tx: confirm_contents_will_match_tx.clone(),
78 get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(),
79 find_all_matches_rx: find_all_matches_rx.clone(),
80 publish_matches: tx.clone(),
81 };
82 scope.spawn(worker.run());
83 }
84 scope.spawn(self.provide_search_paths(
85 &query,
86 input_paths_tx,
87 sorted_search_results_tx,
88 ));
89 scope.spawn(self.maintain_sorted_search_results(
90 sorted_search_results_rx,
91 get_buffer_for_full_scan_tx,
92 ))
93 });
94 let open_buffers =
95 self.open_buffers(get_buffer_for_full_scan_rx, find_all_matches_tx, cx);
96 futures::future::join(worker_pool, open_buffers).await;
97
98 let limit_reached = matches_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_RANGES
99 || matched_buffer_count.load(Ordering::Acquire) > MAX_SEARCH_RESULT_FILES;
100 if limit_reached {
101 _ = tx.send(SearchResult::LimitReached).await;
102 }
103 })
104 .detach();
105 rx
106 }
107
108 async fn provide_search_paths<'this>(
109 &'this self,
110 query: &SearchQuery,
111 tx: Sender<InputPath<'this>>,
112 results: Sender<oneshot::Receiver<ProjectPath>>,
113 ) {
114 for (snapshot, worktree_settings) in &self.snapshots {
115 for entry in snapshot.entries(query.include_ignored(), 0) {
116 let (should_scan_tx, should_scan_rx) = oneshot::channel();
117 let Ok(_) = tx
118 .send(InputPath {
119 entry,
120 settings: worktree_settings,
121 snapshot: snapshot,
122 should_scan_tx,
123 })
124 .await
125 else {
126 return;
127 };
128 if results.send(should_scan_rx).await.is_err() {
129 return;
130 };
131 }
132 }
133 }
134
135 async fn maintain_sorted_search_results(
136 &self,
137 rx: Receiver<oneshot::Receiver<ProjectPath>>,
138 paths_for_full_scan: Sender<ProjectPath>,
139 ) {
140 let mut rx = pin!(rx);
141 while let Some(mut next_path_result) = rx.next().await {
142 let Some(successful_path) = next_path_result.next().await else {
143 // This math did not produce a match, hence skip it.
144 continue;
145 };
146 if paths_for_full_scan.send(successful_path).await.is_err() {
147 return;
148 };
149 }
150 }
151
152 /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
153 async fn open_buffers<'a>(
154 &'a self,
155 rx: Receiver<ProjectPath>,
156 find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
157 cx: &mut AsyncApp,
158 ) {
159 _ = maybe!(async move {
160 while let Ok(requested_path) = rx.recv().await {
161 let Some(buffer) = self
162 .buffer_store
163 .update(cx, |this, cx| this.open_buffer(requested_path, cx))?
164 .await
165 .log_err()
166 else {
167 continue;
168 };
169 let snapshot = buffer.read_with(cx, |this, _| this.snapshot())?;
170 find_all_matches_tx.send((buffer, snapshot)).await?;
171 }
172 Result::<_, anyhow::Error>::Ok(())
173 })
174 .await;
175 }
176}
177
178struct Worker<'search> {
179 query: &'search SearchQuery,
180 matched_buffer_count: &'search AtomicUsize,
181 matches_count: &'search AtomicUsize,
182 open_buffers: &'search HashSet<ProjectEntryId>,
183 fs: &'search dyn Fs,
184 /// Start off with all paths in project and filter them based on:
185 /// - Include filters
186 /// - Exclude filters
187 /// - Only open buffers
188 /// - Scan ignored files
189 /// Put another way: filter out files that can't match (without looking at file contents)
190 input_paths_rx: Receiver<InputPath<'search>>,
191
192 /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match
193 /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory.
194 confirm_contents_will_match_tx: Sender<MatchingEntry>,
195 confirm_contents_will_match_rx: Receiver<MatchingEntry>,
196 /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges).
197 /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
198 get_buffer_for_full_scan_tx: Sender<ProjectPath>,
199 /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
200 find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
201 /// Cool, we have results; let's share them with the world.
202 publish_matches: Sender<SearchResult>,
203}
204
205impl Worker<'_> {
206 async fn run(self) {
207 let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
208 let mut find_first_match = pin!(self.confirm_contents_will_match_rx.fuse());
209 let mut scan_path = pin!(self.input_paths_rx.fuse());
210 let handler = RequestHandler {
211 query: self.query,
212 open_entries: &self.open_buffers,
213 fs: self.fs,
214 matched_buffer_count: self.matched_buffer_count,
215 matches_count: self.matches_count,
216 confirm_contents_will_match_tx: &self.confirm_contents_will_match_tx,
217 get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx,
218 publish_matches: &self.publish_matches,
219 };
220
221 loop {
222 select_biased! {
223 find_all_matches = find_all_matches.next() => {
224 let Some(matches) = find_all_matches else {
225 self.publish_matches.close();
226 continue;
227 };
228 let result = handler.handle_find_all_matches(matches).await;
229 if let Some(_should_bail) = result {
230 self.publish_matches.close();
231 break;
232 }
233 },
234 find_first_match = find_first_match.next() => {
235 if let Some(buffer_with_at_least_one_match) = find_first_match {
236 handler.handle_find_first_match(buffer_with_at_least_one_match).await;
237 } else {
238 self.get_buffer_for_full_scan_tx.close();
239 }
240
241 },
242 scan_path = scan_path.next() => {
243 if let Some(path_to_scan) = scan_path {
244 handler.handle_scan_path(path_to_scan).await;
245 } else {
246 self.confirm_contents_will_match_tx.close();
247 }
248
249 }
250 complete => break,
251 }
252 }
253 }
254}
255
256struct RequestHandler<'worker> {
257 query: &'worker SearchQuery,
258 fs: &'worker dyn Fs,
259 open_entries: &'worker HashSet<ProjectEntryId>,
260 matched_buffer_count: &'worker AtomicUsize,
261 matches_count: &'worker AtomicUsize,
262
263 confirm_contents_will_match_tx: &'worker Sender<MatchingEntry>,
264 get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
265 publish_matches: &'worker Sender<SearchResult>,
266}
267
268struct LimitReached;
269
270impl RequestHandler<'_> {
271 async fn handle_find_all_matches(
272 &self,
273 (buffer, snapshot): (Entity<Buffer>, BufferSnapshot),
274 ) -> Option<LimitReached> {
275 let ranges = self
276 .query
277 .search(&snapshot, None)
278 .await
279 .iter()
280 .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
281 .collect::<Vec<_>>();
282
283 let matched_ranges = ranges.len();
284 if self.matched_buffer_count.fetch_add(1, Ordering::Release) > MAX_SEARCH_RESULT_FILES
285 || self
286 .matches_count
287 .fetch_add(matched_ranges, Ordering::Release)
288 > MAX_SEARCH_RESULT_RANGES
289 {
290 Some(LimitReached)
291 } else {
292 _ = self
293 .publish_matches
294 .send(SearchResult::Buffer { buffer, ranges })
295 .await;
296 None
297 }
298 }
299 async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
300 _=maybe!(async move {
301 let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
302 let Some(file) = self.fs.open_sync(&abs_path).await.log_err() else {
303 return anyhow::Ok(());
304 };
305
306 let mut file = BufReader::new(file);
307 let file_start = file.fill_buf()?;
308
309 if let Err(Some(starting_position)) =
310 std::str::from_utf8(file_start).map_err(|e| e.error_len())
311 {
312 // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
313 // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
314 log::debug!(
315 "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
316 );
317 return Ok(());
318 }
319
320 if self.query.detect(file).unwrap_or(false) {
321 // Yes, we should scan the whole file.
322 entry.should_scan_tx.send(entry.path).await?;
323 }
324 Ok(())
325 }).await;
326 }
327
328 async fn handle_scan_path(&self, req: InputPath<'_>) {
329 _ = maybe!(async move {
330 let InputPath {
331 entry,
332 settings,
333 snapshot,
334 should_scan_tx,
335 } = req;
336 if entry.is_dir() && entry.is_ignored {
337 if !settings.is_path_excluded(&entry.path) {
338 // Self::scan_ignored_dir(
339 // self.fs,
340 // &snapshot,
341 // &entry.path,
342 // self.query,
343 // &filter_tx,
344 // &output_tx,
345 // )
346 // .await?;
347 }
348 return Ok(());
349 }
350
351 if entry.is_fifo || !entry.is_file() {
352 return Ok(());
353 }
354
355 if self.query.filters_path() {
356 let matched_path = if self.query.match_full_paths() {
357 let mut full_path = snapshot.root_name().as_std_path().to_owned();
358 full_path.push(entry.path.as_std_path());
359 self.query.match_path(&full_path)
360 } else {
361 self.query.match_path(entry.path.as_std_path())
362 };
363 if !matched_path {
364 return Ok(());
365 }
366 }
367
368 if self.open_entries.contains(&entry.id) {
369 // The buffer is already in memory and that's the version we want to scan;
370 // hence skip the dilly-dally and look for all matches straight away.
371 self.get_buffer_for_full_scan_tx
372 .send(ProjectPath {
373 worktree_id: snapshot.id(),
374 path: entry.path.clone(),
375 })
376 .await?;
377 } else {
378 self.confirm_contents_will_match_tx
379 .send(MatchingEntry {
380 should_scan_tx: should_scan_tx,
381 worktree_root: snapshot.abs_path().clone(),
382 path: ProjectPath {
383 worktree_id: snapshot.id(),
384 path: entry.path.clone(),
385 },
386 })
387 .await?;
388 }
389
390 anyhow::Ok(())
391 })
392 .await;
393 }
394}
395
396struct InputPath<'worker> {
397 entry: &'worker Entry,
398 settings: &'worker WorktreeSettings,
399 snapshot: &'worker Snapshot,
400 should_scan_tx: oneshot::Sender<ProjectPath>,
401}
402
403struct MatchingEntry {
404 worktree_root: Arc<Path>,
405 path: ProjectPath,
406 should_scan_tx: oneshot::Sender<ProjectPath>,
407}