1use std::{
2 ops::{ControlFlow, Range},
3 path::Path,
4 pin::{Pin, pin},
5 sync::{
6 Arc,
7 atomic::{AtomicUsize, Ordering},
8 },
9};
10
11use futures::{FutureExt, StreamExt, future::BoxFuture, select_biased};
12use gpui::{App, AsyncApp, Entity, WeakEntity};
13use language::{Buffer, BufferSnapshot};
14use postage::oneshot;
15use smol::channel::{Receiver, Sender, bounded, unbounded};
16use text::Anchor;
17use util::{ResultExt, maybe};
18use worktree::{Entry, Snapshot, WorktreeSettings};
19
20use crate::{
21 ProjectPath,
22 buffer_store::BufferStore,
23 search::{SearchQuery, SearchResult},
24};
25
26pub(crate) struct ProjectSearcher {
27 buffer_store: WeakEntity<BufferStore>,
28 pub(crate) snapshots: Vec<(Snapshot, WorktreeSettings)>,
29}
30
31const MAX_SEARCH_RESULT_FILES: usize = 5_000;
32const MAX_SEARCH_RESULT_RANGES: usize = 10_000;
33
34impl ProjectSearcher {
35 pub(crate) fn search(self, query: SearchQuery, cx: &mut App) -> Receiver<SearchResult> {
36 let executor = cx.background_executor().clone();
37 let (tx, rx) = unbounded();
38 cx.spawn(async move |cx| {
39 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
40 let (find_all_matches_tx, find_all_matches_rx) = bounded(MAX_CONCURRENT_BUFFER_OPENS);
41 let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) =
42 bounded(MAX_CONCURRENT_BUFFER_OPENS);
43 let matches_count = AtomicUsize::new(0);
44 let matched_buffer_count = AtomicUsize::new(0);
45 let worker_pool = executor.scoped(|scope| {
46 let (input_paths_tx, input_paths_rx) = bounded(64);
47 let (find_first_match_tx, find_first_match_rx) = bounded(64);
48
49 for _ in 0..executor.num_cpus() {
50 let worker = Worker {
51 query: &query,
52 matched_buffer_count: &matched_buffer_count,
53 matches_count: &matches_count,
54 input_paths_rx: input_paths_rx.clone(),
55 find_first_match_rx: find_first_match_rx.clone(),
56 find_first_match_tx: find_first_match_tx.clone(),
57 get_buffer_for_full_scan_tx: get_buffer_for_full_scan_tx.clone(),
58 find_all_matches_rx: find_all_matches_rx.clone(),
59 publish_matches: tx.clone(),
60 };
61 scope.spawn(worker.run());
62 }
63 scope.spawn(self.provide_search_paths(&query, input_paths_tx))
64 });
65 self.open_buffers(get_buffer_for_full_scan_rx, find_all_matches_tx, cx)
66 .await;
67 worker_pool.await;
68 let limit_reached = matches_count.load(Ordering::Release) > MAX_SEARCH_RESULT_RANGES
69 || matched_buffer_count.load(Ordering::Release) > MAX_SEARCH_RESULT_FILES;
70 if limit_reached {
71 _ = tx.send(SearchResult::LimitReached).await;
72 }
73 })
74 .detach();
75 rx
76 }
77
78 async fn provide_search_paths<'a>(
79 &'a self,
80 query: &SearchQuery,
81 tx: Sender<(&'a Entry, &'a WorktreeSettings)>,
82 ) {
83 for (snapshot, worktree_settings) in &self.snapshots {
84 for entry in snapshot.entries(query.include_ignored(), 0) {
85 let Ok(_) = tx.send((entry, worktree_settings)).await else {
86 return;
87 };
88 }
89 }
90 }
91
92 /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
93 async fn open_buffers<'a>(
94 &'a self,
95 rx: Receiver<ProjectPath>,
96 find_all_matches_tx: Sender<(Entity<Buffer>, BufferSnapshot)>,
97 cx: &mut AsyncApp,
98 ) {
99 _ = maybe!(async move {
100 while let Ok(requested_path) = rx.recv().await {
101 let Some(buffer) = self
102 .buffer_store
103 .update(cx, |this, cx| this.open_buffer(requested_path, cx))?
104 .await
105 .log_err()
106 else {
107 continue;
108 };
109 let snapshot = buffer.read_with(cx, |this, _| this.snapshot())?;
110 find_all_matches_tx.send((buffer, snapshot)).await?;
111 }
112 Result::<_, anyhow::Error>::Ok(())
113 })
114 .await;
115 }
116}
117
118struct Worker<'search> {
119 query: &'search SearchQuery,
120 matched_buffer_count: &'search AtomicUsize,
121 matches_count: &'search AtomicUsize,
122 /// Start off with all paths in project and filter them based on:
123 /// - Include filters
124 /// - Exclude filters
125 /// - Only open buffers
126 /// - Scan ignored files
127 /// Put another way: filter out files that can't match (without looking at file contents)
128 input_paths_rx: Receiver<InputPath<'search>>,
129 /// After that, figure out which paths contain at least one match (look at file contents). That's called "partial scan".
130 find_first_match_tx: Sender<MatchingEntry>,
131 find_first_match_rx: Receiver<MatchingEntry>,
132 /// Of those that contain at least one match, look for rest of matches (and figure out their ranges).
133 /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it).
134 get_buffer_for_full_scan_tx: Sender<ProjectPath>,
135 /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
136 find_all_matches_rx: Receiver<(Entity<Buffer>, BufferSnapshot)>,
137 /// Cool, we have results; let's share them with the world.
138 publish_matches: Sender<SearchResult>,
139}
140
141impl Worker<'_> {
142 async fn run(self) {
143 let mut find_all_matches = pin!(self.find_all_matches_rx.fuse());
144 let mut find_first_match = pin!(self.find_first_match_rx.fuse());
145 let mut scan_path = pin!(self.input_paths_rx.fuse());
146 let handler = RequestHandler {
147 query: self.query,
148 matched_buffer_count: self.matched_buffer_count,
149 matches_count: self.matches_count,
150 find_first_match_tx: &self.find_first_match_tx,
151 get_buffer_for_full_scan_tx: &self.get_buffer_for_full_scan_tx,
152 publish_matches: &self.publish_matches,
153 };
154 loop {
155 select_biased! {
156 find_all_matches = find_all_matches.next() => {
157 let result = handler.handle_find_all_matches(find_all_matches).await;
158 if let Some(should_bail) = result {
159 return;
160 }
161 },
162 find_first_match = find_first_match.next() => {
163
164 },
165 scan_path = scan_path.next() => {
166 handler.handle_scan_path(scan_path).await;
167 }
168 complete => break,
169 }
170 }
171 }
172}
173
174struct RequestHandler<'worker> {
175 query: &'worker SearchQuery,
176 matched_buffer_count: &'worker AtomicUsize,
177 matches_count: &'worker AtomicUsize,
178
179 find_first_match_tx: &'worker Sender<MatchingEntry>,
180 get_buffer_for_full_scan_tx: &'worker Sender<ProjectPath>,
181 publish_matches: &'worker Sender<SearchResult>,
182}
183
184struct LimitReached;
185
186impl RequestHandler<'_> {
187 async fn handle_find_all_matches(
188 &self,
189 req: Option<(Entity<Buffer>, BufferSnapshot)>,
190 ) -> Option<LimitReached> {
191 let Some((buffer, snapshot)) = req else {
192 unreachable!()
193 };
194 let ranges = self
195 .query
196 .search(&snapshot, None)
197 .await
198 .iter()
199 .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end))
200 .collect::<Vec<_>>();
201
202 let matched_ranges = ranges.len();
203 if self.matched_buffer_count.fetch_add(1, Ordering::Release) > MAX_SEARCH_RESULT_FILES
204 || self
205 .matches_count
206 .fetch_add(matched_ranges, Ordering::Release)
207 > MAX_SEARCH_RESULT_RANGES
208 {
209 Some(LimitReached)
210 } else {
211 self.publish_matches
212 .send(SearchResult::Buffer { buffer, ranges })
213 .await;
214 None
215 }
216 }
217 async fn handle_scan_path(&self, req: InputPath<'_>) {
218 let InputPath {
219 entry,
220 settings,
221 snapshot,
222 } = req;
223 if entry.is_dir() && entry.is_ignored {
224 if !settings.is_path_excluded(&entry.path) {
225 Self::scan_ignored_dir(&fs, &snapshot, &entry.path, &query, &filter_tx, &output_tx)
226 .await?;
227 }
228 return None;
229 // continue;
230 }
231
232 if entry.is_fifo || !entry.is_file() {
233 return None;
234 }
235
236 if self.query.filters_path() {
237 let matched_path = if self.query.match_full_paths() {
238 let mut full_path = snapshot.root_name().as_std_path().to_owned();
239 full_path.push(entry.path.as_std_path());
240 self.query.match_path(&full_path)
241 } else {
242 self.query.match_path(entry.path.as_std_path())
243 };
244 if !matched_path {
245 return None;
246 // continue;
247 }
248 }
249
250 let (mut tx, rx) = oneshot::channel();
251
252 if open_entries.contains(&entry.id) {
253 tx.send(ProjectPath {
254 worktree_id: snapshot.id(),
255 path: entry.path.clone(),
256 })
257 .await?;
258 } else {
259 filter_tx
260 .send(MatchingEntry {
261 respond: tx,
262 worktree_root: snapshot.abs_path().clone(),
263 path: ProjectPath {
264 worktree_id: snapshot.id(),
265 path: entry.path.clone(),
266 },
267 })
268 .await?;
269 }
270
271 output_tx.send(rx).await?;
272 }
273}
274
275struct InputPath<'worker> {
276 entry: &'worker Entry,
277 settings: &'worker WorktreeSettings,
278 snapshot: &'worker Snapshot,
279}
280
281struct MatchingEntry {
282 worktree_root: Arc<Path>,
283 path: ProjectPath,
284 respond: oneshot::Sender<ProjectPath>,
285}