1use std::{
2 cmp,
3 collections::VecDeque,
4 path::PathBuf,
5 sync::{
6 atomic::{AtomicUsize, Ordering::SeqCst},
7 Arc,
8 },
9};
10
11use anyhow::{anyhow, Context as _, Result};
12use collections::{HashMap, HashSet};
13use fs::Fs;
14use gpui::{AppContext, AsyncAppContext, EntityId, EventEmitter, Model, ModelContext, WeakModel};
15use rpc::{
16 proto::{self, AnyProtoClient},
17 TypedEnvelope,
18};
19use smol::{
20 channel::{Receiver, Sender},
21 lock::Semaphore,
22 stream::StreamExt,
23};
24use text::ReplicaId;
25use util::ResultExt;
26use worktree::{Entry, ProjectEntryId, Snapshot, Worktree, WorktreeId, WorktreeSettings};
27
28use crate::{search::SearchQuery, ProjectPath};
29
30pub struct WorktreeStore {
31 is_shared: bool,
32 worktrees: Vec<WorktreeHandle>,
33 worktrees_reordered: bool,
34}
35
36pub enum WorktreeStoreEvent {
37 WorktreeAdded(Model<Worktree>),
38 WorktreeRemoved(EntityId, WorktreeId),
39 WorktreeOrderChanged,
40}
41
42impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
43
44impl WorktreeStore {
45 pub fn new(retain_worktrees: bool) -> Self {
46 Self {
47 is_shared: retain_worktrees,
48 worktrees: Vec::new(),
49 worktrees_reordered: false,
50 }
51 }
52
53 /// Iterates through all worktrees, including ones that don't appear in the project panel
54 pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Model<Worktree>> {
55 self.worktrees
56 .iter()
57 .filter_map(move |worktree| worktree.upgrade())
58 }
59
60 /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
61 pub fn visible_worktrees<'a>(
62 &'a self,
63 cx: &'a AppContext,
64 ) -> impl 'a + DoubleEndedIterator<Item = Model<Worktree>> {
65 self.worktrees()
66 .filter(|worktree| worktree.read(cx).is_visible())
67 }
68
69 pub fn worktree_for_id(&self, id: WorktreeId, cx: &AppContext) -> Option<Model<Worktree>> {
70 self.worktrees()
71 .find(|worktree| worktree.read(cx).id() == id)
72 }
73
74 pub fn worktree_for_entry(
75 &self,
76 entry_id: ProjectEntryId,
77 cx: &AppContext,
78 ) -> Option<Model<Worktree>> {
79 self.worktrees()
80 .find(|worktree| worktree.read(cx).contains_entry(entry_id))
81 }
82
83 pub fn entry_for_id<'a>(
84 &'a self,
85 entry_id: ProjectEntryId,
86 cx: &'a AppContext,
87 ) -> Option<&'a Entry> {
88 self.worktrees()
89 .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
90 }
91
92 pub fn add(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
93 let push_strong_handle = self.is_shared || worktree.read(cx).is_visible();
94 let handle = if push_strong_handle {
95 WorktreeHandle::Strong(worktree.clone())
96 } else {
97 WorktreeHandle::Weak(worktree.downgrade())
98 };
99 if self.worktrees_reordered {
100 self.worktrees.push(handle);
101 } else {
102 let i = match self
103 .worktrees
104 .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
105 other.upgrade().map(|worktree| worktree.read(cx).abs_path())
106 }) {
107 Ok(i) | Err(i) => i,
108 };
109 self.worktrees.insert(i, handle);
110 }
111
112 cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
113
114 let handle_id = worktree.entity_id();
115 cx.observe_release(worktree, move |_, worktree, cx| {
116 cx.emit(WorktreeStoreEvent::WorktreeRemoved(
117 handle_id,
118 worktree.id(),
119 ));
120 })
121 .detach();
122 }
123
124 pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut ModelContext<Self>) {
125 self.worktrees.retain(|worktree| {
126 if let Some(worktree) = worktree.upgrade() {
127 if worktree.read(cx).id() == id_to_remove {
128 cx.emit(WorktreeStoreEvent::WorktreeRemoved(
129 worktree.entity_id(),
130 id_to_remove,
131 ));
132 false
133 } else {
134 true
135 }
136 } else {
137 false
138 }
139 });
140 }
141
142 pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
143 self.worktrees_reordered = worktrees_reordered;
144 }
145
146 pub fn set_worktrees_from_proto(
147 &mut self,
148 worktrees: Vec<proto::WorktreeMetadata>,
149 replica_id: ReplicaId,
150 remote_id: u64,
151 client: AnyProtoClient,
152 cx: &mut ModelContext<Self>,
153 ) -> Result<()> {
154 let mut old_worktrees_by_id = self
155 .worktrees
156 .drain(..)
157 .filter_map(|worktree| {
158 let worktree = worktree.upgrade()?;
159 Some((worktree.read(cx).id(), worktree))
160 })
161 .collect::<HashMap<_, _>>();
162
163 for worktree in worktrees {
164 if let Some(old_worktree) =
165 old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
166 {
167 self.worktrees.push(WorktreeHandle::Strong(old_worktree));
168 } else {
169 self.add(
170 &Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx),
171 cx,
172 );
173 }
174 }
175
176 Ok(())
177 }
178
179 pub fn move_worktree(
180 &mut self,
181 source: WorktreeId,
182 destination: WorktreeId,
183 cx: &mut ModelContext<Self>,
184 ) -> Result<()> {
185 if source == destination {
186 return Ok(());
187 }
188
189 let mut source_index = None;
190 let mut destination_index = None;
191 for (i, worktree) in self.worktrees.iter().enumerate() {
192 if let Some(worktree) = worktree.upgrade() {
193 let worktree_id = worktree.read(cx).id();
194 if worktree_id == source {
195 source_index = Some(i);
196 if destination_index.is_some() {
197 break;
198 }
199 } else if worktree_id == destination {
200 destination_index = Some(i);
201 if source_index.is_some() {
202 break;
203 }
204 }
205 }
206 }
207
208 let source_index =
209 source_index.with_context(|| format!("Missing worktree for id {source}"))?;
210 let destination_index =
211 destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
212
213 if source_index == destination_index {
214 return Ok(());
215 }
216
217 let worktree_to_move = self.worktrees.remove(source_index);
218 self.worktrees.insert(destination_index, worktree_to_move);
219 self.worktrees_reordered = true;
220 cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
221 cx.notify();
222 Ok(())
223 }
224
225 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
226 for worktree in &self.worktrees {
227 if let Some(worktree) = worktree.upgrade() {
228 worktree.update(cx, |worktree, _| {
229 if let Some(worktree) = worktree.as_remote_mut() {
230 worktree.disconnected_from_host();
231 }
232 });
233 }
234 }
235 }
236
237 pub fn set_shared(&mut self, is_shared: bool, cx: &mut ModelContext<Self>) {
238 self.is_shared = is_shared;
239
240 // When shared, retain all worktrees
241 if is_shared {
242 for worktree_handle in self.worktrees.iter_mut() {
243 match worktree_handle {
244 WorktreeHandle::Strong(_) => {}
245 WorktreeHandle::Weak(worktree) => {
246 if let Some(worktree) = worktree.upgrade() {
247 *worktree_handle = WorktreeHandle::Strong(worktree);
248 }
249 }
250 }
251 }
252 }
253 // When not shared, only retain the visible worktrees
254 else {
255 for worktree_handle in self.worktrees.iter_mut() {
256 if let WorktreeHandle::Strong(worktree) = worktree_handle {
257 let is_visible = worktree.update(cx, |worktree, _| {
258 worktree.stop_observing_updates();
259 worktree.is_visible()
260 });
261 if !is_visible {
262 *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
263 }
264 }
265 }
266 }
267 }
268
269 /// search over all worktrees (ignoring open buffers)
270 /// the query is tested against the file on disk and matching files are returned.
271 pub fn find_search_candidates(
272 &self,
273 query: SearchQuery,
274 limit: usize,
275 skip_entries: HashSet<ProjectEntryId>,
276 fs: Arc<dyn Fs>,
277 cx: &ModelContext<Self>,
278 ) -> Receiver<ProjectPath> {
279 let (matching_paths_tx, matching_paths_rx) = smol::channel::bounded(1024);
280 let snapshots = self
281 .visible_worktrees(cx)
282 .filter_map(|tree| {
283 let tree = tree.read(cx);
284 Some((tree.snapshot(), tree.as_local()?.settings()))
285 })
286 .collect::<Vec<_>>();
287 let include_root = snapshots.len() > 1;
288 let path_count: usize = snapshots
289 .iter()
290 .map(|(snapshot, _)| {
291 if query.include_ignored() {
292 snapshot.file_count()
293 } else {
294 snapshot.visible_file_count()
295 }
296 })
297 .sum();
298
299 let remaining_paths = AtomicUsize::new(limit);
300 if path_count == 0 {
301 return matching_paths_rx;
302 }
303 let workers = cx.background_executor().num_cpus().min(path_count);
304 let paths_per_worker = (path_count + workers - 1) / workers;
305
306 let executor = cx.background_executor().clone();
307 cx.background_executor()
308 .spawn(async move {
309 let fs = &fs;
310 let query = &query;
311 let matching_paths_tx = &matching_paths_tx;
312 let snapshots = &snapshots;
313 let remaining_paths = &remaining_paths;
314
315 executor
316 .scoped(move |scope| {
317 let max_concurrent_workers = Arc::new(Semaphore::new(workers));
318
319 for worker_ix in 0..workers {
320 let snapshots = snapshots.clone();
321 let worker_start_ix = worker_ix * paths_per_worker;
322 let worker_end_ix = worker_start_ix + paths_per_worker;
323 let skip_entries = skip_entries.clone();
324 let limiter = Arc::clone(&max_concurrent_workers);
325 scope.spawn(async move {
326 let _guard = limiter.acquire().await;
327 Self::search_snapshots(
328 &snapshots,
329 worker_start_ix,
330 worker_end_ix,
331 &query,
332 remaining_paths,
333 &matching_paths_tx,
334 &skip_entries,
335 include_root,
336 fs,
337 )
338 .await;
339 });
340 }
341
342 if query.include_ignored() {
343 for (snapshot, settings) in snapshots {
344 for ignored_entry in
345 snapshot.entries(true, 0).filter(|e| e.is_ignored)
346 {
347 let limiter = Arc::clone(&max_concurrent_workers);
348 scope.spawn(async move {
349 let _guard = limiter.acquire().await;
350 if remaining_paths.load(SeqCst) == 0 {
351 return;
352 }
353
354 Self::search_ignored_entry(
355 &snapshot,
356 &settings,
357 ignored_entry,
358 &fs,
359 &query,
360 remaining_paths,
361 &matching_paths_tx,
362 )
363 .await;
364 });
365 }
366 }
367 }
368 })
369 .await
370 })
371 .detach();
372 return matching_paths_rx;
373 }
374
375 #[allow(clippy::too_many_arguments)]
376 async fn search_snapshots(
377 snapshots: &Vec<(worktree::Snapshot, WorktreeSettings)>,
378 worker_start_ix: usize,
379 worker_end_ix: usize,
380 query: &SearchQuery,
381 remaining_paths: &AtomicUsize,
382 results_tx: &Sender<ProjectPath>,
383 skip_entries: &HashSet<ProjectEntryId>,
384 include_root: bool,
385 fs: &Arc<dyn Fs>,
386 ) {
387 let mut snapshot_start_ix = 0;
388 let mut abs_path = PathBuf::new();
389
390 for (snapshot, _) in snapshots {
391 let snapshot_end_ix = snapshot_start_ix
392 + if query.include_ignored() {
393 snapshot.file_count()
394 } else {
395 snapshot.visible_file_count()
396 };
397 if worker_end_ix <= snapshot_start_ix {
398 break;
399 } else if worker_start_ix > snapshot_end_ix {
400 snapshot_start_ix = snapshot_end_ix;
401 continue;
402 } else {
403 let start_in_snapshot = worker_start_ix.saturating_sub(snapshot_start_ix);
404 let end_in_snapshot = cmp::min(worker_end_ix, snapshot_end_ix) - snapshot_start_ix;
405
406 for entry in snapshot
407 .files(false, start_in_snapshot)
408 .take(end_in_snapshot - start_in_snapshot)
409 {
410 if results_tx.is_closed() {
411 break;
412 }
413 if skip_entries.contains(&entry.id) {
414 continue;
415 }
416
417 let matched_path = if include_root {
418 let mut full_path = PathBuf::from(snapshot.root_name());
419 full_path.push(&entry.path);
420 query.file_matches(Some(&full_path))
421 } else {
422 query.file_matches(Some(&entry.path))
423 };
424
425 let matches = if matched_path {
426 abs_path.clear();
427 abs_path.push(&snapshot.abs_path());
428 abs_path.push(&entry.path);
429 if let Some(file) = fs.open_sync(&abs_path).await.log_err() {
430 query.detect(file).unwrap_or(false)
431 } else {
432 false
433 }
434 } else {
435 false
436 };
437
438 if matches {
439 if remaining_paths
440 .fetch_update(SeqCst, SeqCst, |value| {
441 if value > 0 {
442 Some(value - 1)
443 } else {
444 None
445 }
446 })
447 .is_err()
448 {
449 return;
450 }
451
452 let project_path = ProjectPath {
453 worktree_id: snapshot.id(),
454 path: entry.path.clone(),
455 };
456 if results_tx.send(project_path).await.is_err() {
457 return;
458 }
459 }
460 }
461
462 snapshot_start_ix = snapshot_end_ix;
463 }
464 }
465 }
466
467 async fn search_ignored_entry(
468 snapshot: &Snapshot,
469 settings: &WorktreeSettings,
470 ignored_entry: &Entry,
471 fs: &Arc<dyn Fs>,
472 query: &SearchQuery,
473 remaining_paths: &AtomicUsize,
474 counter_tx: &Sender<ProjectPath>,
475 ) {
476 let mut ignored_paths_to_process =
477 VecDeque::from([snapshot.abs_path().join(&ignored_entry.path)]);
478
479 while let Some(ignored_abs_path) = ignored_paths_to_process.pop_front() {
480 let metadata = fs
481 .metadata(&ignored_abs_path)
482 .await
483 .with_context(|| format!("fetching fs metadata for {ignored_abs_path:?}"))
484 .log_err()
485 .flatten();
486
487 if let Some(fs_metadata) = metadata {
488 if fs_metadata.is_dir {
489 let files = fs
490 .read_dir(&ignored_abs_path)
491 .await
492 .with_context(|| format!("listing ignored path {ignored_abs_path:?}"))
493 .log_err();
494
495 if let Some(mut subfiles) = files {
496 while let Some(subfile) = subfiles.next().await {
497 if let Some(subfile) = subfile.log_err() {
498 ignored_paths_to_process.push_back(subfile);
499 }
500 }
501 }
502 } else if !fs_metadata.is_symlink {
503 if !query.file_matches(Some(&ignored_abs_path))
504 || settings.is_path_excluded(&ignored_entry.path)
505 {
506 continue;
507 }
508 let matches = if let Some(file) = fs
509 .open_sync(&ignored_abs_path)
510 .await
511 .with_context(|| format!("Opening ignored path {ignored_abs_path:?}"))
512 .log_err()
513 {
514 query.detect(file).unwrap_or(false)
515 } else {
516 false
517 };
518
519 if matches {
520 if remaining_paths
521 .fetch_update(SeqCst, SeqCst, |value| {
522 if value > 0 {
523 Some(value - 1)
524 } else {
525 None
526 }
527 })
528 .is_err()
529 {
530 return;
531 }
532
533 let project_path = ProjectPath {
534 worktree_id: snapshot.id(),
535 path: Arc::from(
536 ignored_abs_path
537 .strip_prefix(snapshot.abs_path())
538 .expect("scanning worktree-related files"),
539 ),
540 };
541 if counter_tx.send(project_path).await.is_err() {
542 return;
543 }
544 }
545 }
546 }
547 }
548 }
549
550 pub async fn handle_create_project_entry(
551 this: Model<Self>,
552 envelope: TypedEnvelope<proto::CreateProjectEntry>,
553 mut cx: AsyncAppContext,
554 ) -> Result<proto::ProjectEntryResponse> {
555 let worktree = this.update(&mut cx, |this, cx| {
556 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
557 this.worktree_for_id(worktree_id, cx)
558 .ok_or_else(|| anyhow!("worktree not found"))
559 })??;
560 Worktree::handle_create_entry(worktree, envelope.payload, cx).await
561 }
562
563 pub async fn handle_rename_project_entry(
564 this: Model<Self>,
565 envelope: TypedEnvelope<proto::RenameProjectEntry>,
566 mut cx: AsyncAppContext,
567 ) -> Result<proto::ProjectEntryResponse> {
568 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
569 let worktree = this.update(&mut cx, |this, cx| {
570 this.worktree_for_entry(entry_id, cx)
571 .ok_or_else(|| anyhow!("worktree not found"))
572 })??;
573 Worktree::handle_rename_entry(worktree, envelope.payload, cx).await
574 }
575
576 pub async fn handle_copy_project_entry(
577 this: Model<Self>,
578 envelope: TypedEnvelope<proto::CopyProjectEntry>,
579 mut cx: AsyncAppContext,
580 ) -> Result<proto::ProjectEntryResponse> {
581 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
582 let worktree = this.update(&mut cx, |this, cx| {
583 this.worktree_for_entry(entry_id, cx)
584 .ok_or_else(|| anyhow!("worktree not found"))
585 })??;
586 Worktree::handle_copy_entry(worktree, envelope.payload, cx).await
587 }
588
589 pub async fn handle_delete_project_entry(
590 this: Model<Self>,
591 envelope: TypedEnvelope<proto::DeleteProjectEntry>,
592 mut cx: AsyncAppContext,
593 ) -> Result<proto::ProjectEntryResponse> {
594 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
595 let worktree = this.update(&mut cx, |this, cx| {
596 this.worktree_for_entry(entry_id, cx)
597 .ok_or_else(|| anyhow!("worktree not found"))
598 })??;
599 Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
600 }
601
602 pub async fn handle_expand_project_entry(
603 this: Model<Self>,
604 envelope: TypedEnvelope<proto::ExpandProjectEntry>,
605 mut cx: AsyncAppContext,
606 ) -> Result<proto::ExpandProjectEntryResponse> {
607 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
608 let worktree = this
609 .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
610 .ok_or_else(|| anyhow!("invalid request"))?;
611 Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
612 }
613}
614
615#[derive(Clone)]
616enum WorktreeHandle {
617 Strong(Model<Worktree>),
618 Weak(WeakModel<Worktree>),
619}
620
621impl WorktreeHandle {
622 fn upgrade(&self) -> Option<Model<Worktree>> {
623 match self {
624 WorktreeHandle::Strong(handle) => Some(handle.clone()),
625 WorktreeHandle::Weak(handle) => handle.upgrade(),
626 }
627 }
628}