1use std::{
2 future::Future,
3 path::{Path, PathBuf},
4 sync::{
5 Arc,
6 atomic::{AtomicU64, AtomicUsize},
7 },
8};
9
10use anyhow::{Context as _, Result, anyhow, bail};
11use collections::HashMap;
12use fs::{Fs, copy_recursive};
13use futures::{FutureExt, future::Shared};
14use gpui::{
15 App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Global, Task,
16 WeakEntity,
17};
18use itertools::Either;
19use postage::{prelude::Stream as _, watch};
20use rpc::{
21 AnyProtoClient, ErrorExt, TypedEnvelope,
22 proto::{self, REMOTE_SERVER_PROJECT_ID},
23};
24use text::ReplicaId;
25use util::{
26 ResultExt,
27 paths::{PathStyle, RemotePathBuf, SanitizedPath},
28 rel_path::RelPath,
29};
30use worktree::{
31 CreatedEntry, Entry, ProjectEntryId, UpdatedEntriesSet, UpdatedGitRepositoriesSet, Worktree,
32 WorktreeId,
33};
34
35use crate::{ProjectPath, trusted_worktrees::TrustedWorktrees};
36
37enum WorktreeStoreState {
38 Local {
39 fs: Arc<dyn Fs>,
40 },
41 Remote {
42 upstream_client: AnyProtoClient,
43 upstream_project_id: u64,
44 path_style: PathStyle,
45 },
46}
47
48#[derive(Clone)]
49pub struct WorktreeIdCounter(Arc<AtomicU64>);
50
51impl Default for WorktreeIdCounter {
52 fn default() -> Self {
53 Self(Arc::new(AtomicU64::new(1)))
54 }
55}
56
57impl WorktreeIdCounter {
58 pub fn get(cx: &mut App) -> Self {
59 cx.default_global::<Self>().clone()
60 }
61
62 fn next(&self) -> u64 {
63 self.0.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
64 }
65}
66
67impl Global for WorktreeIdCounter {}
68
69pub struct WorktreeStore {
70 next_entry_id: Arc<AtomicUsize>,
71 next_worktree_id: WorktreeIdCounter,
72 downstream_client: Option<(AnyProtoClient, u64)>,
73 retain_worktrees: bool,
74 worktrees: Vec<WorktreeHandle>,
75 worktrees_reordered: bool,
76 scanning_enabled: bool,
77 #[allow(clippy::type_complexity)]
78 loading_worktrees:
79 HashMap<Arc<SanitizedPath>, Shared<Task<Result<Entity<Worktree>, Arc<anyhow::Error>>>>>,
80 initial_scan_complete: (watch::Sender<bool>, watch::Receiver<bool>),
81 state: WorktreeStoreState,
82}
83
84#[derive(Debug)]
85pub enum WorktreeStoreEvent {
86 WorktreeAdded(Entity<Worktree>),
87 WorktreeRemoved(EntityId, WorktreeId),
88 WorktreeReleased(EntityId, WorktreeId),
89 WorktreeOrderChanged,
90 WorktreeUpdateSent(Entity<Worktree>),
91 WorktreeUpdatedEntries(WorktreeId, UpdatedEntriesSet),
92 WorktreeUpdatedGitRepositories(WorktreeId, UpdatedGitRepositoriesSet),
93 WorktreeDeletedEntry(WorktreeId, ProjectEntryId),
94 WorktreeUpdatedRootRepoCommonDir(WorktreeId),
95}
96
97impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
98
99impl WorktreeStore {
100 pub fn init(client: &AnyProtoClient) {
101 client.add_entity_request_handler(Self::handle_create_project_entry);
102 client.add_entity_request_handler(Self::handle_copy_project_entry);
103 client.add_entity_request_handler(Self::handle_delete_project_entry);
104 client.add_entity_request_handler(Self::handle_trash_project_entry);
105 client.add_entity_request_handler(Self::handle_restore_project_entry);
106 client.add_entity_request_handler(Self::handle_expand_project_entry);
107 client.add_entity_request_handler(Self::handle_expand_all_for_project_entry);
108 }
109
110 pub fn init_remote(client: &AnyProtoClient) {
111 client.add_entity_request_handler(Self::handle_allocate_worktree_id);
112 }
113
114 pub fn local(
115 retain_worktrees: bool,
116 fs: Arc<dyn Fs>,
117 next_worktree_id: WorktreeIdCounter,
118 ) -> Self {
119 Self {
120 next_entry_id: Default::default(),
121 next_worktree_id,
122 loading_worktrees: Default::default(),
123 downstream_client: None,
124 worktrees: Vec::new(),
125 worktrees_reordered: false,
126 scanning_enabled: true,
127 retain_worktrees,
128 initial_scan_complete: watch::channel_with(true),
129 state: WorktreeStoreState::Local { fs },
130 }
131 }
132
133 pub fn remote(
134 retain_worktrees: bool,
135 upstream_client: AnyProtoClient,
136 upstream_project_id: u64,
137 path_style: PathStyle,
138 next_worktree_id: WorktreeIdCounter,
139 ) -> Self {
140 Self {
141 next_entry_id: Default::default(),
142 next_worktree_id,
143 loading_worktrees: Default::default(),
144 downstream_client: None,
145 worktrees: Vec::new(),
146 worktrees_reordered: false,
147 scanning_enabled: true,
148 retain_worktrees,
149 initial_scan_complete: watch::channel_with(true),
150 state: WorktreeStoreState::Remote {
151 upstream_client,
152 upstream_project_id,
153 path_style,
154 },
155 }
156 }
157
158 pub fn next_worktree_id(&self) -> impl Future<Output = Result<WorktreeId>> + use<> {
159 let strategy = match (&self.state, &self.downstream_client) {
160 // we are a remote server, the client is in charge of assigning worktree ids
161 (WorktreeStoreState::Local { .. }, Some((client, REMOTE_SERVER_PROJECT_ID))) => {
162 Either::Left(client.clone())
163 }
164 // we are just a local zed project, we can assign ids
165 (WorktreeStoreState::Local { .. }, _) => Either::Right(self.next_worktree_id.next()),
166 // we are connected to a remote server, we are in charge of assigning worktree ids
167 (WorktreeStoreState::Remote { .. }, _) => Either::Right(self.next_worktree_id.next()),
168 };
169 async move {
170 match strategy {
171 Either::Left(client) => Ok(client
172 .request(proto::AllocateWorktreeId {
173 project_id: REMOTE_SERVER_PROJECT_ID,
174 })
175 .await?
176 .worktree_id),
177 Either::Right(id) => Ok(id),
178 }
179 .map(WorktreeId::from_proto)
180 }
181 }
182
183 pub fn disable_scanner(&mut self) {
184 self.scanning_enabled = false;
185 *self.initial_scan_complete.0.borrow_mut() = true;
186 }
187
188 /// Returns a future that resolves when all visible worktrees have completed
189 /// their initial scan (entries populated, git repos detected).
190 pub fn wait_for_initial_scan(&self) -> impl Future<Output = ()> + use<> {
191 let mut rx = self.initial_scan_complete.1.clone();
192 async move {
193 let mut done = *rx.borrow();
194 while !done {
195 if let Some(value) = rx.recv().await {
196 done = value;
197 } else {
198 break;
199 }
200 }
201 }
202 }
203
204 /// Returns whether all visible worktrees have completed their initial scan.
205 pub fn initial_scan_completed(&self) -> bool {
206 *self.initial_scan_complete.1.borrow()
207 }
208
209 /// Checks whether all visible worktrees have completed their initial scan
210 /// and no worktree creations are pending, and updates the watch channel accordingly.
211 fn update_initial_scan_state(&mut self, cx: &App) {
212 let complete = self.loading_worktrees.is_empty()
213 && self
214 .visible_worktrees(cx)
215 .all(|wt| wt.read(cx).completed_scan_id() >= 1);
216 *self.initial_scan_complete.0.borrow_mut() = complete;
217 }
218
219 /// Spawns a detached task that waits for a worktree's initial scan to complete,
220 /// then rechecks and updates the aggregate initial scan state.
221 fn observe_worktree_scan_completion(
222 &mut self,
223 worktree: &Entity<Worktree>,
224 cx: &mut Context<Self>,
225 ) {
226 let await_scan = worktree.update(cx, |worktree, _cx| worktree.wait_for_snapshot(1));
227 cx.spawn(async move |this, cx| {
228 await_scan.await.ok();
229 this.update(cx, |this, cx| {
230 this.update_initial_scan_state(cx);
231 })
232 .ok();
233 anyhow::Ok(())
234 })
235 .detach();
236 }
237
238 /// Iterates through all worktrees, including ones that don't appear in the project panel
239 pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Entity<Worktree>> {
240 self.worktrees
241 .iter()
242 .filter_map(move |worktree| worktree.upgrade())
243 }
244
245 /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
246 pub fn visible_worktrees<'a>(
247 &'a self,
248 cx: &'a App,
249 ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
250 self.worktrees()
251 .filter(|worktree| worktree.read(cx).is_visible())
252 }
253
254 /// Iterates through all user-visible worktrees (directories and files that appear in the project panel) and other, invisible single files that could appear e.g. due to drag and drop.
255 pub fn visible_worktrees_and_single_files<'a>(
256 &'a self,
257 cx: &'a App,
258 ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
259 self.worktrees()
260 .filter(|worktree| worktree.read(cx).is_visible() || worktree.read(cx).is_single_file())
261 }
262
263 pub fn worktree_for_id(&self, id: WorktreeId, cx: &App) -> Option<Entity<Worktree>> {
264 self.worktrees()
265 .find(|worktree| worktree.read(cx).id() == id)
266 }
267
268 pub fn worktree_for_entry(
269 &self,
270 entry_id: ProjectEntryId,
271 cx: &App,
272 ) -> Option<Entity<Worktree>> {
273 self.worktrees()
274 .find(|worktree| worktree.read(cx).contains_entry(entry_id))
275 }
276
277 pub fn find_worktree(
278 &self,
279 abs_path: impl AsRef<Path>,
280 cx: &App,
281 ) -> Option<(Entity<Worktree>, Arc<RelPath>)> {
282 let abs_path = SanitizedPath::new(abs_path.as_ref());
283 for tree in self.worktrees() {
284 let path_style = tree.read(cx).path_style();
285 if let Some(relative_path) =
286 path_style.strip_prefix(abs_path.as_ref(), tree.read(cx).abs_path().as_ref())
287 {
288 return Some((tree.clone(), relative_path.into_arc()));
289 }
290 }
291 None
292 }
293
294 pub fn project_path_for_absolute_path(&self, abs_path: &Path, cx: &App) -> Option<ProjectPath> {
295 self.find_worktree(abs_path, cx)
296 .map(|(worktree, relative_path)| ProjectPath {
297 worktree_id: worktree.read(cx).id(),
298 path: relative_path,
299 })
300 }
301
302 pub fn absolutize(&self, project_path: &ProjectPath, cx: &App) -> Option<PathBuf> {
303 let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
304 Some(worktree.read(cx).absolutize(&project_path.path))
305 }
306
307 pub fn path_style(&self) -> PathStyle {
308 match &self.state {
309 WorktreeStoreState::Local { .. } => PathStyle::local(),
310 WorktreeStoreState::Remote { path_style, .. } => *path_style,
311 }
312 }
313
314 pub fn find_or_create_worktree(
315 &mut self,
316 abs_path: impl AsRef<Path>,
317 visible: bool,
318 cx: &mut Context<Self>,
319 ) -> Task<Result<(Entity<Worktree>, Arc<RelPath>)>> {
320 let abs_path = abs_path.as_ref();
321 if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
322 Task::ready(Ok((tree, relative_path)))
323 } else {
324 let worktree = self.create_worktree(abs_path, visible, cx);
325 cx.background_spawn(async move { Ok((worktree.await?, RelPath::empty().into())) })
326 }
327 }
328
329 pub fn entry_for_id<'a>(&'a self, entry_id: ProjectEntryId, cx: &'a App) -> Option<&'a Entry> {
330 self.worktrees()
331 .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
332 }
333
334 pub fn worktree_and_entry_for_id<'a>(
335 &'a self,
336 entry_id: ProjectEntryId,
337 cx: &'a App,
338 ) -> Option<(Entity<Worktree>, &'a Entry)> {
339 self.worktrees().find_map(|worktree| {
340 worktree
341 .read(cx)
342 .entry_for_id(entry_id)
343 .map(|e| (worktree.clone(), e))
344 })
345 }
346
347 pub fn entry_for_path<'a>(&'a self, path: &ProjectPath, cx: &'a App) -> Option<&'a Entry> {
348 self.worktree_for_id(path.worktree_id, cx)?
349 .read(cx)
350 .entry_for_path(&path.path)
351 }
352
353 pub fn copy_entry(
354 &mut self,
355 entry_id: ProjectEntryId,
356 new_project_path: ProjectPath,
357 cx: &mut Context<Self>,
358 ) -> Task<Result<Option<Entry>>> {
359 let Some(old_worktree) = self.worktree_for_entry(entry_id, cx) else {
360 return Task::ready(Err(anyhow!("no such worktree")));
361 };
362 let Some(old_entry) = old_worktree.read(cx).entry_for_id(entry_id) else {
363 return Task::ready(Err(anyhow!("no such entry")));
364 };
365 let Some(new_worktree) = self.worktree_for_id(new_project_path.worktree_id, cx) else {
366 return Task::ready(Err(anyhow!("no such worktree")));
367 };
368
369 match &self.state {
370 WorktreeStoreState::Local { fs } => {
371 let old_abs_path = old_worktree.read(cx).absolutize(&old_entry.path);
372 let new_abs_path = new_worktree.read(cx).absolutize(&new_project_path.path);
373 let fs = fs.clone();
374 let copy = cx.background_spawn(async move {
375 copy_recursive(
376 fs.as_ref(),
377 &old_abs_path,
378 &new_abs_path,
379 Default::default(),
380 )
381 .await
382 });
383
384 cx.spawn(async move |_, cx| {
385 copy.await?;
386 new_worktree
387 .update(cx, |this, cx| {
388 this.as_local_mut().unwrap().refresh_entry(
389 new_project_path.path,
390 None,
391 cx,
392 )
393 })
394 .await
395 })
396 }
397 WorktreeStoreState::Remote {
398 upstream_client,
399 upstream_project_id,
400 ..
401 } => {
402 let response = upstream_client.request(proto::CopyProjectEntry {
403 project_id: *upstream_project_id,
404 entry_id: entry_id.to_proto(),
405 new_path: new_project_path.path.to_proto(),
406 new_worktree_id: new_project_path.worktree_id.to_proto(),
407 });
408 cx.spawn(async move |_, cx| {
409 let response = response.await?;
410 match response.entry {
411 Some(entry) => new_worktree
412 .update(cx, |worktree, cx| {
413 worktree.as_remote_mut().unwrap().insert_entry(
414 entry,
415 response.worktree_scan_id as usize,
416 cx,
417 )
418 })
419 .await
420 .map(Some),
421 None => Ok(None),
422 }
423 })
424 }
425 }
426 }
427
428 pub fn rename_entry(
429 &mut self,
430 entry_id: ProjectEntryId,
431 new_project_path: ProjectPath,
432 cx: &mut Context<Self>,
433 ) -> Task<Result<CreatedEntry>> {
434 let Some(old_worktree) = self.worktree_for_entry(entry_id, cx) else {
435 return Task::ready(Err(anyhow!("no such worktree")));
436 };
437 let Some(old_entry) = old_worktree.read(cx).entry_for_id(entry_id).cloned() else {
438 return Task::ready(Err(anyhow!("no such entry")));
439 };
440 let Some(new_worktree) = self.worktree_for_id(new_project_path.worktree_id, cx) else {
441 return Task::ready(Err(anyhow!("no such worktree")));
442 };
443
444 match &self.state {
445 WorktreeStoreState::Local { fs } => {
446 let abs_old_path = old_worktree.read(cx).absolutize(&old_entry.path);
447 let new_worktree_ref = new_worktree.read(cx);
448 let is_root_entry = new_worktree_ref
449 .root_entry()
450 .is_some_and(|e| e.id == entry_id);
451 let abs_new_path = if is_root_entry {
452 let abs_path = new_worktree_ref.abs_path();
453 let Some(root_parent_path) = abs_path.parent() else {
454 return Task::ready(Err(anyhow!("no parent for path {:?}", abs_path)));
455 };
456 root_parent_path.join(new_project_path.path.as_std_path())
457 } else {
458 new_worktree_ref.absolutize(&new_project_path.path)
459 };
460
461 let fs = fs.clone();
462 let case_sensitive = new_worktree
463 .read(cx)
464 .as_local()
465 .unwrap()
466 .fs_is_case_sensitive();
467
468 let do_rename =
469 async move |fs: &dyn Fs, old_path: &Path, new_path: &Path, overwrite| {
470 fs.rename(
471 &old_path,
472 &new_path,
473 fs::RenameOptions {
474 overwrite,
475 ..fs::RenameOptions::default()
476 },
477 )
478 .await
479 .with_context(|| format!("renaming {old_path:?} into {new_path:?}"))
480 };
481
482 let rename = cx.background_spawn({
483 let abs_new_path = abs_new_path.clone();
484 async move {
485 // If we're on a case-insensitive FS and we're doing a case-only rename (i.e. `foobar` to `FOOBAR`)
486 // we want to overwrite, because otherwise we run into a file-already-exists error.
487 let overwrite = !case_sensitive
488 && abs_old_path != abs_new_path
489 && abs_old_path.to_str().map(|p| p.to_lowercase())
490 == abs_new_path.to_str().map(|p| p.to_lowercase());
491
492 // The directory we're renaming into might not exist yet
493 if let Err(e) =
494 do_rename(fs.as_ref(), &abs_old_path, &abs_new_path, overwrite).await
495 {
496 if let Some(err) = e.downcast_ref::<std::io::Error>()
497 && err.kind() == std::io::ErrorKind::NotFound
498 {
499 if let Some(parent) = abs_new_path.parent() {
500 fs.create_dir(parent).await.with_context(|| {
501 format!("creating parent directory {parent:?}")
502 })?;
503 return do_rename(
504 fs.as_ref(),
505 &abs_old_path,
506 &abs_new_path,
507 overwrite,
508 )
509 .await;
510 }
511 }
512 return Err(e);
513 }
514 Ok(())
515 }
516 });
517
518 cx.spawn(async move |_, cx| {
519 rename.await?;
520 Ok(new_worktree
521 .update(cx, |this, cx| {
522 let local = this.as_local_mut().unwrap();
523 if is_root_entry {
524 // We eagerly update `abs_path` and refresh this worktree.
525 // Otherwise, the FS watcher would do it on the `RootUpdated` event,
526 // but with a noticeable delay, so we handle it proactively.
527 local.update_abs_path_and_refresh(
528 SanitizedPath::new_arc(&abs_new_path),
529 cx,
530 );
531 Task::ready(Ok(this.root_entry().cloned()))
532 } else {
533 // First refresh the parent directory (in case it was newly created)
534 if let Some(parent) = new_project_path.path.parent() {
535 let _ = local.refresh_entries_for_paths(vec![parent.into()]);
536 }
537 // Then refresh the new path
538 local.refresh_entry(
539 new_project_path.path.clone(),
540 Some(old_entry.path),
541 cx,
542 )
543 }
544 })
545 .await?
546 .map(CreatedEntry::Included)
547 .unwrap_or_else(|| CreatedEntry::Excluded {
548 abs_path: abs_new_path,
549 }))
550 })
551 }
552 WorktreeStoreState::Remote {
553 upstream_client,
554 upstream_project_id,
555 ..
556 } => {
557 let response = upstream_client.request(proto::RenameProjectEntry {
558 project_id: *upstream_project_id,
559 entry_id: entry_id.to_proto(),
560 new_path: new_project_path.path.to_proto(),
561 new_worktree_id: new_project_path.worktree_id.to_proto(),
562 });
563 cx.spawn(async move |_, cx| {
564 let response = response.await?;
565 match response.entry {
566 Some(entry) => new_worktree
567 .update(cx, |worktree, cx| {
568 worktree.as_remote_mut().unwrap().insert_entry(
569 entry,
570 response.worktree_scan_id as usize,
571 cx,
572 )
573 })
574 .await
575 .map(CreatedEntry::Included),
576 None => {
577 let abs_path = new_worktree.read_with(cx, |worktree, _| {
578 worktree.absolutize(&new_project_path.path)
579 });
580 Ok(CreatedEntry::Excluded { abs_path })
581 }
582 }
583 })
584 }
585 }
586 }
587 pub fn create_worktree(
588 &mut self,
589 abs_path: impl AsRef<Path>,
590 visible: bool,
591 cx: &mut Context<Self>,
592 ) -> Task<Result<Entity<Worktree>>> {
593 let abs_path: Arc<SanitizedPath> = SanitizedPath::new_arc(&abs_path);
594 let is_via_collab = matches!(&self.state, WorktreeStoreState::Remote { upstream_client, .. } if upstream_client.is_via_collab());
595 if !self.loading_worktrees.contains_key(&abs_path) {
596 let task = match &self.state {
597 WorktreeStoreState::Remote {
598 upstream_client,
599 path_style,
600 ..
601 } => {
602 if upstream_client.is_via_collab() {
603 Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
604 } else {
605 let abs_path = RemotePathBuf::new(abs_path.to_string(), *path_style);
606 self.create_remote_worktree(upstream_client.clone(), abs_path, visible, cx)
607 }
608 }
609 WorktreeStoreState::Local { fs } => {
610 self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
611 }
612 };
613
614 self.loading_worktrees
615 .insert(abs_path.clone(), task.shared());
616
617 if visible && self.scanning_enabled {
618 *self.initial_scan_complete.0.borrow_mut() = false;
619 }
620 }
621 let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
622 cx.spawn(async move |this, cx| {
623 let result = task.await;
624 this.update(cx, |this, cx| {
625 this.loading_worktrees.remove(&abs_path);
626 if !visible || !this.scanning_enabled || result.is_err() {
627 this.update_initial_scan_state(cx);
628 }
629 })
630 .ok();
631
632 match result {
633 Ok(worktree) => {
634 if !is_via_collab {
635 if let Some((trusted_worktrees, worktree_store)) = this
636 .update(cx, |_, cx| {
637 TrustedWorktrees::try_get_global(cx).zip(Some(cx.entity()))
638 })
639 .ok()
640 .flatten()
641 {
642 trusted_worktrees.update(cx, |trusted_worktrees, cx| {
643 trusted_worktrees.can_trust(
644 &worktree_store,
645 worktree.read(cx).id(),
646 cx,
647 );
648 });
649 }
650
651 this.update(cx, |this, cx| {
652 if this.scanning_enabled && visible {
653 this.observe_worktree_scan_completion(&worktree, cx);
654 }
655 })
656 .ok();
657 }
658 Ok(worktree)
659 }
660 Err(err) => Err((*err).cloned()),
661 }
662 })
663 }
664
665 fn create_remote_worktree(
666 &mut self,
667 client: AnyProtoClient,
668 abs_path: RemotePathBuf,
669 visible: bool,
670 cx: &mut Context<Self>,
671 ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
672 let path_style = abs_path.path_style();
673 let mut abs_path = abs_path.to_string();
674 // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
675 // in which case want to strip the leading the `/`.
676 // On the host-side, the `~` will get expanded.
677 // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
678 if abs_path.starts_with("/~") {
679 abs_path = abs_path[1..].to_string();
680 }
681 if abs_path.is_empty() {
682 abs_path = "~/".to_string();
683 }
684
685 cx.spawn(async move |this, cx| {
686 let this = this.upgrade().context("Dropped worktree store")?;
687
688 let path = RemotePathBuf::new(abs_path, path_style);
689 let response = client
690 .request(proto::AddWorktree {
691 project_id: REMOTE_SERVER_PROJECT_ID,
692 path: path.to_proto(),
693 visible,
694 })
695 .await?;
696
697 if let Some(existing_worktree) = this.read_with(cx, |this, cx| {
698 this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
699 }) {
700 return Ok(existing_worktree);
701 }
702
703 let root_path_buf = PathBuf::from(response.canonicalized_path.clone());
704 let root_name = root_path_buf
705 .file_name()
706 .map(|n| n.to_string_lossy().into_owned())
707 .unwrap_or(root_path_buf.to_string_lossy().into_owned());
708
709 let worktree = cx.update(|cx| {
710 Worktree::remote(
711 REMOTE_SERVER_PROJECT_ID,
712 ReplicaId::REMOTE_SERVER,
713 proto::WorktreeMetadata {
714 id: response.worktree_id,
715 root_name,
716 visible,
717 abs_path: response.canonicalized_path,
718 root_repo_common_dir: response.root_repo_common_dir,
719 },
720 client,
721 path_style,
722 cx,
723 )
724 });
725
726 this.update(cx, |this, cx| {
727 this.add(&worktree, cx);
728 });
729 Ok(worktree)
730 })
731 }
732
733 fn create_local_worktree(
734 &mut self,
735 fs: Arc<dyn Fs>,
736 abs_path: Arc<SanitizedPath>,
737 visible: bool,
738 cx: &mut Context<Self>,
739 ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
740 let next_entry_id = self.next_entry_id.clone();
741 let scanning_enabled = self.scanning_enabled;
742
743 let next_worktree_id = self.next_worktree_id();
744
745 cx.spawn(async move |this, cx| {
746 let worktree_id = next_worktree_id.await?;
747 let worktree = Worktree::local(
748 SanitizedPath::cast_arc(abs_path.clone()),
749 visible,
750 fs,
751 next_entry_id,
752 scanning_enabled,
753 worktree_id,
754 cx,
755 )
756 .await?;
757
758 this.update(cx, |this, cx| this.add(&worktree, cx))?;
759
760 if visible {
761 cx.update(|cx| {
762 cx.add_recent_document(abs_path.as_path());
763 });
764 }
765
766 Ok(worktree)
767 })
768 }
769
770 pub fn add(&mut self, worktree: &Entity<Worktree>, cx: &mut Context<Self>) {
771 let worktree_id = worktree.read(cx).id();
772 debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
773
774 let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
775 let handle = if push_strong_handle {
776 WorktreeHandle::Strong(worktree.clone())
777 } else {
778 WorktreeHandle::Weak(worktree.downgrade())
779 };
780 if self.worktrees_reordered {
781 self.worktrees.push(handle);
782 } else {
783 let i = match self
784 .worktrees
785 .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
786 other.upgrade().map(|worktree| worktree.read(cx).abs_path())
787 }) {
788 Ok(i) | Err(i) => i,
789 };
790 self.worktrees.insert(i, handle);
791 }
792
793 cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
794 self.send_project_updates(cx);
795
796 let handle_id = worktree.entity_id();
797 cx.subscribe(worktree, |_, worktree, event, cx| {
798 let worktree_id = worktree.read(cx).id();
799 match event {
800 worktree::Event::UpdatedEntries(changes) => {
801 cx.emit(WorktreeStoreEvent::WorktreeUpdatedEntries(
802 worktree_id,
803 changes.clone(),
804 ));
805 }
806 worktree::Event::UpdatedGitRepositories(set) => {
807 cx.emit(WorktreeStoreEvent::WorktreeUpdatedGitRepositories(
808 worktree_id,
809 set.clone(),
810 ));
811 }
812 worktree::Event::DeletedEntry(id) => {
813 cx.emit(WorktreeStoreEvent::WorktreeDeletedEntry(worktree_id, *id))
814 }
815 worktree::Event::Deleted => {
816 // The worktree root itself has been deleted (for single-file worktrees)
817 // The worktree will be removed via the observe_release callback
818 }
819 worktree::Event::UpdatedRootRepoCommonDir => {
820 cx.emit(WorktreeStoreEvent::WorktreeUpdatedRootRepoCommonDir(
821 worktree_id,
822 ));
823 }
824 }
825 })
826 .detach();
827 cx.observe_release(worktree, move |this, worktree, cx| {
828 cx.emit(WorktreeStoreEvent::WorktreeReleased(
829 handle_id,
830 worktree.id(),
831 ));
832 cx.emit(WorktreeStoreEvent::WorktreeRemoved(
833 handle_id,
834 worktree.id(),
835 ));
836 this.send_project_updates(cx);
837 })
838 .detach();
839 }
840
841 pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut Context<Self>) {
842 self.worktrees.retain(|worktree| {
843 if let Some(worktree) = worktree.upgrade() {
844 if worktree.read(cx).id() == id_to_remove {
845 cx.emit(WorktreeStoreEvent::WorktreeRemoved(
846 worktree.entity_id(),
847 id_to_remove,
848 ));
849 false
850 } else {
851 true
852 }
853 } else {
854 false
855 }
856 });
857 self.update_initial_scan_state(cx);
858 self.send_project_updates(cx);
859 }
860
861 pub fn worktree_for_main_worktree_path(
862 &self,
863 path: &Path,
864 cx: &App,
865 ) -> Option<Entity<Worktree>> {
866 self.visible_worktrees(cx).find(|worktree| {
867 let worktree = worktree.read(cx);
868 if let Some(common_dir) = worktree.root_repo_common_dir() {
869 common_dir.parent() == Some(path)
870 } else {
871 worktree.abs_path().as_ref() == path
872 }
873 })
874 }
875
876 pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
877 self.worktrees_reordered = worktrees_reordered;
878 }
879
880 fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
881 match &self.state {
882 WorktreeStoreState::Remote {
883 upstream_client,
884 upstream_project_id,
885 ..
886 } => Some((upstream_client.clone(), *upstream_project_id)),
887 WorktreeStoreState::Local { .. } => None,
888 }
889 }
890
891 pub fn set_worktrees_from_proto(
892 &mut self,
893 worktrees: Vec<proto::WorktreeMetadata>,
894 replica_id: ReplicaId,
895 cx: &mut Context<Self>,
896 ) -> Result<()> {
897 let mut old_worktrees_by_id = self
898 .worktrees
899 .drain(..)
900 .filter_map(|worktree| {
901 let worktree = worktree.upgrade()?;
902 Some((worktree.read(cx).id(), worktree))
903 })
904 .collect::<HashMap<_, _>>();
905
906 let (client, project_id) = self.upstream_client().context("invalid project")?;
907
908 for worktree in worktrees {
909 if let Some(old_worktree) =
910 old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
911 {
912 let push_strong_handle =
913 self.retain_worktrees || old_worktree.read(cx).is_visible();
914 let handle = if push_strong_handle {
915 WorktreeHandle::Strong(old_worktree.clone())
916 } else {
917 WorktreeHandle::Weak(old_worktree.downgrade())
918 };
919 self.worktrees.push(handle);
920 } else {
921 self.add(
922 &Worktree::remote(
923 project_id,
924 replica_id,
925 worktree,
926 client.clone(),
927 self.path_style(),
928 cx,
929 ),
930 cx,
931 );
932 }
933 }
934 self.send_project_updates(cx);
935
936 Ok(())
937 }
938
939 pub fn move_worktree(
940 &mut self,
941 source: WorktreeId,
942 destination: WorktreeId,
943 cx: &mut Context<Self>,
944 ) -> Result<()> {
945 if source == destination {
946 return Ok(());
947 }
948
949 let mut source_index = None;
950 let mut destination_index = None;
951 for (i, worktree) in self.worktrees.iter().enumerate() {
952 if let Some(worktree) = worktree.upgrade() {
953 let worktree_id = worktree.read(cx).id();
954 if worktree_id == source {
955 source_index = Some(i);
956 if destination_index.is_some() {
957 break;
958 }
959 } else if worktree_id == destination {
960 destination_index = Some(i);
961 if source_index.is_some() {
962 break;
963 }
964 }
965 }
966 }
967
968 let source_index =
969 source_index.with_context(|| format!("Missing worktree for id {source}"))?;
970 let destination_index =
971 destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
972
973 if source_index == destination_index {
974 return Ok(());
975 }
976
977 let worktree_to_move = self.worktrees.remove(source_index);
978 self.worktrees.insert(destination_index, worktree_to_move);
979 self.worktrees_reordered = true;
980 cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
981 cx.notify();
982 Ok(())
983 }
984
985 pub fn disconnected_from_host(&mut self, cx: &mut App) {
986 for worktree in &self.worktrees {
987 if let Some(worktree) = worktree.upgrade() {
988 worktree.update(cx, |worktree, _| {
989 if let Some(worktree) = worktree.as_remote_mut() {
990 worktree.disconnected_from_host();
991 }
992 });
993 }
994 }
995 }
996
997 pub fn send_project_updates(&mut self, cx: &mut Context<Self>) {
998 let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
999 return;
1000 };
1001
1002 let update = proto::UpdateProject {
1003 project_id,
1004 worktrees: self.worktree_metadata_protos(cx),
1005 };
1006
1007 // collab has bad concurrency guarantees, so we send requests in serial.
1008 let update_project = if downstream_client.is_via_collab() {
1009 Some(downstream_client.request(update))
1010 } else {
1011 downstream_client.send(update).log_err();
1012 None
1013 };
1014 cx.spawn(async move |this, cx| {
1015 if let Some(update_project) = update_project {
1016 update_project.await?;
1017 }
1018
1019 this.update(cx, |this, cx| {
1020 let worktrees = this.worktrees().collect::<Vec<_>>();
1021
1022 for worktree in worktrees {
1023 worktree.update(cx, |worktree, cx| {
1024 let client = downstream_client.clone();
1025 worktree.observe_updates(project_id, cx, {
1026 move |update| {
1027 let client = client.clone();
1028 async move {
1029 if client.is_via_collab() {
1030 client
1031 .request(update)
1032 .map(|result| result.log_err().is_some())
1033 .await
1034 } else {
1035 client.send(update).log_err().is_some()
1036 }
1037 }
1038 }
1039 });
1040 });
1041
1042 cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
1043 }
1044
1045 anyhow::Ok(())
1046 })
1047 })
1048 .detach_and_log_err(cx);
1049 }
1050
1051 pub fn worktree_metadata_protos(&self, cx: &App) -> Vec<proto::WorktreeMetadata> {
1052 self.worktrees()
1053 .map(|worktree| {
1054 let worktree = worktree.read(cx);
1055 proto::WorktreeMetadata {
1056 id: worktree.id().to_proto(),
1057 root_name: worktree.root_name_str().to_owned(),
1058 visible: worktree.is_visible(),
1059 abs_path: worktree.abs_path().to_string_lossy().into_owned(),
1060 root_repo_common_dir: worktree
1061 .root_repo_common_dir()
1062 .map(|p| p.to_string_lossy().into_owned()),
1063 }
1064 })
1065 .collect()
1066 }
1067
1068 pub fn shared(
1069 &mut self,
1070 remote_id: u64,
1071 downstream_client: AnyProtoClient,
1072 cx: &mut Context<Self>,
1073 ) {
1074 self.retain_worktrees = true;
1075 self.downstream_client = Some((downstream_client, remote_id));
1076
1077 // When shared, retain all worktrees
1078 for worktree_handle in self.worktrees.iter_mut() {
1079 match worktree_handle {
1080 WorktreeHandle::Strong(_) => {}
1081 WorktreeHandle::Weak(worktree) => {
1082 if let Some(worktree) = worktree.upgrade() {
1083 *worktree_handle = WorktreeHandle::Strong(worktree);
1084 }
1085 }
1086 }
1087 }
1088 // Only send project updates if we share in a collaborative mode.
1089 // Otherwise we are the remote server which is currently constructing
1090 // worktree store before the client actually has set up its message
1091 // handlers.
1092 if remote_id != REMOTE_SERVER_PROJECT_ID {
1093 self.send_project_updates(cx);
1094 }
1095 }
1096
1097 pub fn unshared(&mut self, cx: &mut Context<Self>) {
1098 self.retain_worktrees = false;
1099 self.downstream_client.take();
1100
1101 // When not shared, only retain the visible worktrees
1102 for worktree_handle in self.worktrees.iter_mut() {
1103 if let WorktreeHandle::Strong(worktree) = worktree_handle {
1104 let is_visible = worktree.update(cx, |worktree, _| {
1105 worktree.stop_observing_updates();
1106 worktree.is_visible()
1107 });
1108 if !is_visible {
1109 *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
1110 }
1111 }
1112 }
1113 }
1114
1115 pub async fn handle_create_project_entry(
1116 this: Entity<Self>,
1117 envelope: TypedEnvelope<proto::CreateProjectEntry>,
1118 mut cx: AsyncApp,
1119 ) -> Result<proto::ProjectEntryResponse> {
1120 let worktree = this.update(&mut cx, |this, cx| {
1121 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
1122 this.worktree_for_id(worktree_id, cx)
1123 .context("worktree not found")
1124 })?;
1125 Worktree::handle_create_entry(worktree, envelope.payload, cx).await
1126 }
1127
1128 pub async fn handle_copy_project_entry(
1129 this: Entity<Self>,
1130 envelope: TypedEnvelope<proto::CopyProjectEntry>,
1131 mut cx: AsyncApp,
1132 ) -> Result<proto::ProjectEntryResponse> {
1133 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1134 let new_worktree_id = WorktreeId::from_proto(envelope.payload.new_worktree_id);
1135 let new_project_path = (
1136 new_worktree_id,
1137 RelPath::from_proto(&envelope.payload.new_path)?,
1138 );
1139 let (scan_id, entry) = this.update(&mut cx, |this, cx| {
1140 let Some((_, project_id)) = this.downstream_client else {
1141 bail!("no downstream client")
1142 };
1143 let Some(entry) = this.entry_for_id(entry_id, cx) else {
1144 bail!("no such entry");
1145 };
1146 if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1147 bail!("entry is private")
1148 }
1149
1150 let new_worktree = this
1151 .worktree_for_id(new_worktree_id, cx)
1152 .context("no such worktree")?;
1153 let scan_id = new_worktree.read(cx).scan_id();
1154 anyhow::Ok((
1155 scan_id,
1156 this.copy_entry(entry_id, new_project_path.into(), cx),
1157 ))
1158 })?;
1159 let entry = entry.await?;
1160 Ok(proto::ProjectEntryResponse {
1161 entry: entry.as_ref().map(|entry| entry.into()),
1162 worktree_scan_id: scan_id as u64,
1163 })
1164 }
1165
1166 pub async fn handle_trash_project_entry(
1167 this: Entity<Self>,
1168 envelope: TypedEnvelope<proto::TrashProjectEntry>,
1169 mut cx: AsyncApp,
1170 ) -> Result<proto::TrashProjectEntryResponse> {
1171 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1172 let worktree = this.update(&mut cx, |this, cx| {
1173 let Some((_, project_id)) = this.downstream_client else {
1174 bail!("no downstream client")
1175 };
1176 let Some(entry) = this.entry_for_id(entry_id, cx) else {
1177 bail!("no entry")
1178 };
1179 if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1180 bail!("entry is private")
1181 }
1182 this.worktree_for_entry(entry_id, cx)
1183 .context("worktree not found")
1184 })?;
1185 Worktree::handle_trash_entry(worktree, envelope.payload, cx).await
1186 }
1187
1188 pub async fn handle_delete_project_entry(
1189 this: Entity<Self>,
1190 envelope: TypedEnvelope<proto::DeleteProjectEntry>,
1191 mut cx: AsyncApp,
1192 ) -> Result<proto::ProjectEntryResponse> {
1193 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1194 let worktree = this.update(&mut cx, |this, cx| {
1195 let Some((_, project_id)) = this.downstream_client else {
1196 bail!("no downstream client")
1197 };
1198 let Some(entry) = this.entry_for_id(entry_id, cx) else {
1199 bail!("no entry")
1200 };
1201 if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1202 bail!("entry is private")
1203 }
1204 this.worktree_for_entry(entry_id, cx)
1205 .context("worktree not found")
1206 })?;
1207 Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
1208 }
1209
1210 pub async fn handle_restore_project_entry(
1211 this: Entity<Self>,
1212 envelope: TypedEnvelope<proto::RestoreProjectEntry>,
1213 mut cx: AsyncApp,
1214 ) -> Result<proto::RestoreProjectEntryResponse> {
1215 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
1216
1217 let worktree = this.update(&mut cx, |this, cx| {
1218 this.worktree_for_id(worktree_id, cx)
1219 .context("worktree not found")
1220 })?;
1221
1222 Worktree::handle_restore_entry(worktree, envelope.payload, cx).await
1223 }
1224
1225 pub async fn handle_rename_project_entry(
1226 this: Entity<Self>,
1227 request: proto::RenameProjectEntry,
1228 mut cx: AsyncApp,
1229 ) -> Result<proto::ProjectEntryResponse> {
1230 let entry_id = ProjectEntryId::from_proto(request.entry_id);
1231 let new_worktree_id = WorktreeId::from_proto(request.new_worktree_id);
1232 let rel_path = RelPath::from_proto(&request.new_path)
1233 .with_context(|| format!("received invalid relative path {:?}", &request.new_path))?;
1234
1235 let (scan_id, task) = this.update(&mut cx, |this, cx| {
1236 let worktree = this
1237 .worktree_for_entry(entry_id, cx)
1238 .context("no such worktree")?;
1239
1240 let Some((_, project_id)) = this.downstream_client else {
1241 bail!("no downstream client")
1242 };
1243 let entry = worktree
1244 .read(cx)
1245 .entry_for_id(entry_id)
1246 .ok_or_else(|| anyhow!("missing entry"))?;
1247 if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1248 bail!("entry is private")
1249 }
1250
1251 let scan_id = worktree.read(cx).scan_id();
1252 anyhow::Ok((
1253 scan_id,
1254 this.rename_entry(entry_id, (new_worktree_id, rel_path).into(), cx),
1255 ))
1256 })?;
1257 Ok(proto::ProjectEntryResponse {
1258 entry: match &task.await? {
1259 CreatedEntry::Included(entry) => Some(entry.into()),
1260 CreatedEntry::Excluded { .. } => None,
1261 },
1262 worktree_scan_id: scan_id as u64,
1263 })
1264 }
1265
1266 pub async fn handle_expand_project_entry(
1267 this: Entity<Self>,
1268 envelope: TypedEnvelope<proto::ExpandProjectEntry>,
1269 mut cx: AsyncApp,
1270 ) -> Result<proto::ExpandProjectEntryResponse> {
1271 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1272 let worktree = this
1273 .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))
1274 .context("invalid request")?;
1275 Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
1276 }
1277
1278 pub async fn handle_expand_all_for_project_entry(
1279 this: Entity<Self>,
1280 envelope: TypedEnvelope<proto::ExpandAllForProjectEntry>,
1281 mut cx: AsyncApp,
1282 ) -> Result<proto::ExpandAllForProjectEntryResponse> {
1283 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1284 let worktree = this
1285 .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))
1286 .context("invalid request")?;
1287 Worktree::handle_expand_all_for_entry(worktree, envelope.payload, cx).await
1288 }
1289
1290 pub async fn handle_allocate_worktree_id(
1291 _this: Entity<Self>,
1292 _envelope: TypedEnvelope<proto::AllocateWorktreeId>,
1293 cx: AsyncApp,
1294 ) -> Result<proto::AllocateWorktreeIdResponse> {
1295 let worktree_id = cx.update(|cx| WorktreeIdCounter::get(cx).next());
1296 Ok(proto::AllocateWorktreeIdResponse { worktree_id })
1297 }
1298
1299 pub fn fs(&self) -> Option<Arc<dyn Fs>> {
1300 match &self.state {
1301 WorktreeStoreState::Local { fs } => Some(fs.clone()),
1302 WorktreeStoreState::Remote { .. } => None,
1303 }
1304 }
1305}
1306
1307#[derive(Clone, Debug)]
1308enum WorktreeHandle {
1309 Strong(Entity<Worktree>),
1310 Weak(WeakEntity<Worktree>),
1311}
1312
1313impl WorktreeHandle {
1314 fn upgrade(&self) -> Option<Entity<Worktree>> {
1315 match self {
1316 WorktreeHandle::Strong(handle) => Some(handle.clone()),
1317 WorktreeHandle::Weak(handle) => handle.upgrade(),
1318 }
1319 }
1320}