1use std::{
2 cmp,
3 collections::VecDeque,
4 path::PathBuf,
5 sync::{
6 atomic::{AtomicUsize, Ordering::SeqCst},
7 Arc,
8 },
9};
10
11use anyhow::{anyhow, Context as _, Result};
12use collections::{HashMap, HashSet};
13use fs::Fs;
14use gpui::{AppContext, AsyncAppContext, EntityId, EventEmitter, Model, ModelContext, WeakModel};
15use rpc::{
16 proto::{self, AnyProtoClient},
17 TypedEnvelope,
18};
19use smol::{
20 channel::{Receiver, Sender},
21 lock::Semaphore,
22 stream::StreamExt,
23};
24use text::ReplicaId;
25use util::ResultExt;
26use worktree::{Entry, ProjectEntryId, Snapshot, Worktree, WorktreeId, WorktreeSettings};
27
28use crate::{search::SearchQuery, ProjectPath};
29
30pub struct WorktreeStore {
31 is_shared: bool,
32 worktrees: Vec<WorktreeHandle>,
33 worktrees_reordered: bool,
34}
35
36pub enum WorktreeStoreEvent {
37 WorktreeAdded(Model<Worktree>),
38 WorktreeRemoved(EntityId, WorktreeId),
39 WorktreeOrderChanged,
40}
41
42impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
43
44impl WorktreeStore {
45 pub fn new(retain_worktrees: bool) -> Self {
46 Self {
47 is_shared: retain_worktrees,
48 worktrees: Vec::new(),
49 worktrees_reordered: false,
50 }
51 }
52
53 /// Iterates through all worktrees, including ones that don't appear in the project panel
54 pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Model<Worktree>> {
55 self.worktrees
56 .iter()
57 .filter_map(move |worktree| worktree.upgrade())
58 }
59
60 /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
61 pub fn visible_worktrees<'a>(
62 &'a self,
63 cx: &'a AppContext,
64 ) -> impl 'a + DoubleEndedIterator<Item = Model<Worktree>> {
65 self.worktrees()
66 .filter(|worktree| worktree.read(cx).is_visible())
67 }
68
69 pub fn worktree_for_id(&self, id: WorktreeId, cx: &AppContext) -> Option<Model<Worktree>> {
70 self.worktrees()
71 .find(|worktree| worktree.read(cx).id() == id)
72 }
73
74 pub fn worktree_for_entry(
75 &self,
76 entry_id: ProjectEntryId,
77 cx: &AppContext,
78 ) -> Option<Model<Worktree>> {
79 self.worktrees()
80 .find(|worktree| worktree.read(cx).contains_entry(entry_id))
81 }
82
83 pub fn entry_for_id<'a>(
84 &'a self,
85 entry_id: ProjectEntryId,
86 cx: &'a AppContext,
87 ) -> Option<&'a Entry> {
88 self.worktrees()
89 .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
90 }
91
92 pub fn add(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
93 let push_strong_handle = self.is_shared || worktree.read(cx).is_visible();
94 let handle = if push_strong_handle {
95 WorktreeHandle::Strong(worktree.clone())
96 } else {
97 WorktreeHandle::Weak(worktree.downgrade())
98 };
99 if self.worktrees_reordered {
100 self.worktrees.push(handle);
101 } else {
102 let i = match self
103 .worktrees
104 .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
105 other.upgrade().map(|worktree| worktree.read(cx).abs_path())
106 }) {
107 Ok(i) | Err(i) => i,
108 };
109 self.worktrees.insert(i, handle);
110 }
111
112 cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
113
114 let handle_id = worktree.entity_id();
115 cx.observe_release(worktree, move |_, worktree, cx| {
116 cx.emit(WorktreeStoreEvent::WorktreeRemoved(
117 handle_id,
118 worktree.id(),
119 ));
120 })
121 .detach();
122 }
123
124 pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut ModelContext<Self>) {
125 self.worktrees.retain(|worktree| {
126 if let Some(worktree) = worktree.upgrade() {
127 if worktree.read(cx).id() == id_to_remove {
128 cx.emit(WorktreeStoreEvent::WorktreeRemoved(
129 worktree.entity_id(),
130 id_to_remove,
131 ));
132 false
133 } else {
134 true
135 }
136 } else {
137 false
138 }
139 });
140 }
141
142 pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
143 self.worktrees_reordered = worktrees_reordered;
144 }
145
146 pub fn set_worktrees_from_proto(
147 &mut self,
148 worktrees: Vec<proto::WorktreeMetadata>,
149 replica_id: ReplicaId,
150 remote_id: u64,
151 client: AnyProtoClient,
152 cx: &mut ModelContext<Self>,
153 ) -> Result<()> {
154 let mut old_worktrees_by_id = self
155 .worktrees
156 .drain(..)
157 .filter_map(|worktree| {
158 let worktree = worktree.upgrade()?;
159 Some((worktree.read(cx).id(), worktree))
160 })
161 .collect::<HashMap<_, _>>();
162
163 for worktree in worktrees {
164 if let Some(old_worktree) =
165 old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
166 {
167 self.worktrees.push(WorktreeHandle::Strong(old_worktree));
168 } else {
169 self.add(
170 &Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx),
171 cx,
172 );
173 }
174 }
175
176 Ok(())
177 }
178
179 pub fn move_worktree(
180 &mut self,
181 source: WorktreeId,
182 destination: WorktreeId,
183 cx: &mut ModelContext<Self>,
184 ) -> Result<()> {
185 if source == destination {
186 return Ok(());
187 }
188
189 let mut source_index = None;
190 let mut destination_index = None;
191 for (i, worktree) in self.worktrees.iter().enumerate() {
192 if let Some(worktree) = worktree.upgrade() {
193 let worktree_id = worktree.read(cx).id();
194 if worktree_id == source {
195 source_index = Some(i);
196 if destination_index.is_some() {
197 break;
198 }
199 } else if worktree_id == destination {
200 destination_index = Some(i);
201 if source_index.is_some() {
202 break;
203 }
204 }
205 }
206 }
207
208 let source_index =
209 source_index.with_context(|| format!("Missing worktree for id {source}"))?;
210 let destination_index =
211 destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
212
213 if source_index == destination_index {
214 return Ok(());
215 }
216
217 let worktree_to_move = self.worktrees.remove(source_index);
218 self.worktrees.insert(destination_index, worktree_to_move);
219 self.worktrees_reordered = true;
220 cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
221 cx.notify();
222 Ok(())
223 }
224
225 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
226 for worktree in &self.worktrees {
227 if let Some(worktree) = worktree.upgrade() {
228 worktree.update(cx, |worktree, _| {
229 if let Some(worktree) = worktree.as_remote_mut() {
230 worktree.disconnected_from_host();
231 }
232 });
233 }
234 }
235 }
236
237 pub fn set_shared(&mut self, is_shared: bool, cx: &mut ModelContext<Self>) {
238 self.is_shared = is_shared;
239
240 // When shared, retain all worktrees
241 if is_shared {
242 for worktree_handle in self.worktrees.iter_mut() {
243 match worktree_handle {
244 WorktreeHandle::Strong(_) => {}
245 WorktreeHandle::Weak(worktree) => {
246 if let Some(worktree) = worktree.upgrade() {
247 *worktree_handle = WorktreeHandle::Strong(worktree);
248 }
249 }
250 }
251 }
252 }
253 // When not shared, only retain the visible worktrees
254 else {
255 for worktree_handle in self.worktrees.iter_mut() {
256 if let WorktreeHandle::Strong(worktree) = worktree_handle {
257 let is_visible = worktree.update(cx, |worktree, _| {
258 worktree.stop_observing_updates();
259 worktree.is_visible()
260 });
261 if !is_visible {
262 *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
263 }
264 }
265 }
266 }
267 }
268
269 /// search over all worktrees (ignoring open buffers)
270 /// the query is tested against the file on disk and matching files are returned.
271 pub fn find_search_candidates(
272 &self,
273 query: SearchQuery,
274 limit: usize,
275 skip_entries: HashSet<ProjectEntryId>,
276 fs: Arc<dyn Fs>,
277 cx: &ModelContext<Self>,
278 ) -> Receiver<ProjectPath> {
279 let (matching_paths_tx, matching_paths_rx) = smol::channel::bounded(1024);
280 let snapshots = self
281 .visible_worktrees(cx)
282 .filter_map(|tree| {
283 let tree = tree.read(cx);
284 Some((tree.snapshot(), tree.as_local()?.settings()))
285 })
286 .collect::<Vec<_>>();
287 let include_root = snapshots.len() > 1;
288 let path_count: usize = snapshots
289 .iter()
290 .map(|(snapshot, _)| {
291 if query.include_ignored() {
292 snapshot.file_count()
293 } else {
294 snapshot.visible_file_count()
295 }
296 })
297 .sum();
298
299 let remaining_paths = AtomicUsize::new(limit);
300 if path_count == 0 {
301 return matching_paths_rx;
302 }
303 let workers = cx.background_executor().num_cpus().min(path_count);
304 let paths_per_worker = (path_count + workers - 1) / workers;
305
306 let executor = cx.background_executor().clone();
307 cx.background_executor()
308 .spawn(async move {
309 let fs = &fs;
310 let query = &query;
311 let matching_paths_tx = &matching_paths_tx;
312 let snapshots = &snapshots;
313 let remaining_paths = &remaining_paths;
314
315 executor
316 .scoped(move |scope| {
317 let max_concurrent_workers = Arc::new(Semaphore::new(workers));
318
319 for worker_ix in 0..workers {
320 let snapshots = snapshots.clone();
321 let worker_start_ix = worker_ix * paths_per_worker;
322 let worker_end_ix = worker_start_ix + paths_per_worker;
323 let skip_entries = skip_entries.clone();
324 let limiter = Arc::clone(&max_concurrent_workers);
325 scope.spawn(async move {
326 let _guard = limiter.acquire().await;
327 Self::search_snapshots(
328 &snapshots,
329 worker_start_ix,
330 worker_end_ix,
331 &query,
332 remaining_paths,
333 &matching_paths_tx,
334 &skip_entries,
335 include_root,
336 fs,
337 )
338 .await;
339 });
340 }
341
342 if query.include_ignored() {
343 for (snapshot, settings) in snapshots {
344 for ignored_entry in
345 snapshot.entries(true, 0).filter(|e| e.is_ignored)
346 {
347 let limiter = Arc::clone(&max_concurrent_workers);
348 scope.spawn(async move {
349 let _guard = limiter.acquire().await;
350 if remaining_paths.load(SeqCst) == 0 {
351 return;
352 }
353
354 Self::search_ignored_entry(
355 &snapshot,
356 &settings,
357 ignored_entry,
358 &fs,
359 &query,
360 remaining_paths,
361 &matching_paths_tx,
362 )
363 .await;
364 });
365 }
366 }
367 }
368 })
369 .await
370 })
371 .detach();
372 return matching_paths_rx;
373 }
374
375 #[allow(clippy::too_many_arguments)]
376 async fn search_snapshots(
377 snapshots: &Vec<(worktree::Snapshot, WorktreeSettings)>,
378 worker_start_ix: usize,
379 worker_end_ix: usize,
380 query: &SearchQuery,
381 remaining_paths: &AtomicUsize,
382 results_tx: &Sender<ProjectPath>,
383 skip_entries: &HashSet<ProjectEntryId>,
384 include_root: bool,
385 fs: &Arc<dyn Fs>,
386 ) {
387 let mut snapshot_start_ix = 0;
388 let mut abs_path = PathBuf::new();
389
390 for (snapshot, _) in snapshots {
391 let snapshot_end_ix = snapshot_start_ix
392 + if query.include_ignored() {
393 snapshot.file_count()
394 } else {
395 snapshot.visible_file_count()
396 };
397 if worker_end_ix <= snapshot_start_ix {
398 break;
399 } else if worker_start_ix > snapshot_end_ix {
400 snapshot_start_ix = snapshot_end_ix;
401 continue;
402 } else {
403 let start_in_snapshot = worker_start_ix.saturating_sub(snapshot_start_ix);
404 let end_in_snapshot = cmp::min(worker_end_ix, snapshot_end_ix) - snapshot_start_ix;
405
406 for entry in snapshot
407 .files(false, start_in_snapshot)
408 .take(end_in_snapshot - start_in_snapshot)
409 {
410 if results_tx.is_closed() {
411 break;
412 }
413 if skip_entries.contains(&entry.id) {
414 continue;
415 }
416 if entry.is_fifo {
417 continue;
418 }
419
420 let matched_path = if include_root {
421 let mut full_path = PathBuf::from(snapshot.root_name());
422 full_path.push(&entry.path);
423 query.file_matches(Some(&full_path))
424 } else {
425 query.file_matches(Some(&entry.path))
426 };
427
428 let matches = if matched_path {
429 abs_path.clear();
430 abs_path.push(&snapshot.abs_path());
431 abs_path.push(&entry.path);
432 if let Some(file) = fs.open_sync(&abs_path).await.log_err() {
433 query.detect(file).unwrap_or(false)
434 } else {
435 false
436 }
437 } else {
438 false
439 };
440
441 if matches {
442 if remaining_paths
443 .fetch_update(SeqCst, SeqCst, |value| {
444 if value > 0 {
445 Some(value - 1)
446 } else {
447 None
448 }
449 })
450 .is_err()
451 {
452 return;
453 }
454
455 let project_path = ProjectPath {
456 worktree_id: snapshot.id(),
457 path: entry.path.clone(),
458 };
459 if results_tx.send(project_path).await.is_err() {
460 return;
461 }
462 }
463 }
464
465 snapshot_start_ix = snapshot_end_ix;
466 }
467 }
468 }
469
470 async fn search_ignored_entry(
471 snapshot: &Snapshot,
472 settings: &WorktreeSettings,
473 ignored_entry: &Entry,
474 fs: &Arc<dyn Fs>,
475 query: &SearchQuery,
476 remaining_paths: &AtomicUsize,
477 counter_tx: &Sender<ProjectPath>,
478 ) {
479 let mut ignored_paths_to_process =
480 VecDeque::from([snapshot.abs_path().join(&ignored_entry.path)]);
481
482 while let Some(ignored_abs_path) = ignored_paths_to_process.pop_front() {
483 let metadata = fs
484 .metadata(&ignored_abs_path)
485 .await
486 .with_context(|| format!("fetching fs metadata for {ignored_abs_path:?}"))
487 .log_err()
488 .flatten();
489
490 if let Some(fs_metadata) = metadata {
491 if fs_metadata.is_dir {
492 let files = fs
493 .read_dir(&ignored_abs_path)
494 .await
495 .with_context(|| format!("listing ignored path {ignored_abs_path:?}"))
496 .log_err();
497
498 if let Some(mut subfiles) = files {
499 while let Some(subfile) = subfiles.next().await {
500 if let Some(subfile) = subfile.log_err() {
501 ignored_paths_to_process.push_back(subfile);
502 }
503 }
504 }
505 } else if !fs_metadata.is_symlink {
506 if !query.file_matches(Some(&ignored_abs_path))
507 || settings.is_path_excluded(&ignored_entry.path)
508 {
509 continue;
510 }
511 let matches = if let Some(file) = fs
512 .open_sync(&ignored_abs_path)
513 .await
514 .with_context(|| format!("Opening ignored path {ignored_abs_path:?}"))
515 .log_err()
516 {
517 query.detect(file).unwrap_or(false)
518 } else {
519 false
520 };
521
522 if matches {
523 if remaining_paths
524 .fetch_update(SeqCst, SeqCst, |value| {
525 if value > 0 {
526 Some(value - 1)
527 } else {
528 None
529 }
530 })
531 .is_err()
532 {
533 return;
534 }
535
536 let project_path = ProjectPath {
537 worktree_id: snapshot.id(),
538 path: Arc::from(
539 ignored_abs_path
540 .strip_prefix(snapshot.abs_path())
541 .expect("scanning worktree-related files"),
542 ),
543 };
544 if counter_tx.send(project_path).await.is_err() {
545 return;
546 }
547 }
548 }
549 }
550 }
551 }
552
553 pub async fn handle_create_project_entry(
554 this: Model<Self>,
555 envelope: TypedEnvelope<proto::CreateProjectEntry>,
556 mut cx: AsyncAppContext,
557 ) -> Result<proto::ProjectEntryResponse> {
558 let worktree = this.update(&mut cx, |this, cx| {
559 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
560 this.worktree_for_id(worktree_id, cx)
561 .ok_or_else(|| anyhow!("worktree not found"))
562 })??;
563 Worktree::handle_create_entry(worktree, envelope.payload, cx).await
564 }
565
566 pub async fn handle_rename_project_entry(
567 this: Model<Self>,
568 envelope: TypedEnvelope<proto::RenameProjectEntry>,
569 mut cx: AsyncAppContext,
570 ) -> Result<proto::ProjectEntryResponse> {
571 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
572 let worktree = this.update(&mut cx, |this, cx| {
573 this.worktree_for_entry(entry_id, cx)
574 .ok_or_else(|| anyhow!("worktree not found"))
575 })??;
576 Worktree::handle_rename_entry(worktree, envelope.payload, cx).await
577 }
578
579 pub async fn handle_copy_project_entry(
580 this: Model<Self>,
581 envelope: TypedEnvelope<proto::CopyProjectEntry>,
582 mut cx: AsyncAppContext,
583 ) -> Result<proto::ProjectEntryResponse> {
584 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
585 let worktree = this.update(&mut cx, |this, cx| {
586 this.worktree_for_entry(entry_id, cx)
587 .ok_or_else(|| anyhow!("worktree not found"))
588 })??;
589 Worktree::handle_copy_entry(worktree, envelope.payload, cx).await
590 }
591
592 pub async fn handle_delete_project_entry(
593 this: Model<Self>,
594 envelope: TypedEnvelope<proto::DeleteProjectEntry>,
595 mut cx: AsyncAppContext,
596 ) -> Result<proto::ProjectEntryResponse> {
597 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
598 let worktree = this.update(&mut cx, |this, cx| {
599 this.worktree_for_entry(entry_id, cx)
600 .ok_or_else(|| anyhow!("worktree not found"))
601 })??;
602 Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
603 }
604
605 pub async fn handle_expand_project_entry(
606 this: Model<Self>,
607 envelope: TypedEnvelope<proto::ExpandProjectEntry>,
608 mut cx: AsyncAppContext,
609 ) -> Result<proto::ExpandProjectEntryResponse> {
610 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
611 let worktree = this
612 .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
613 .ok_or_else(|| anyhow!("invalid request"))?;
614 Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
615 }
616}
617
618#[derive(Clone)]
619enum WorktreeHandle {
620 Strong(Model<Worktree>),
621 Weak(WeakModel<Worktree>),
622}
623
624impl WorktreeHandle {
625 fn upgrade(&self) -> Option<Model<Worktree>> {
626 match self {
627 WorktreeHandle::Strong(handle) => Some(handle.clone()),
628 WorktreeHandle::Weak(handle) => handle.upgrade(),
629 }
630 }
631}