1use std::{
2 future::Future,
3 path::{Path, PathBuf},
4 sync::{
5 Arc,
6 atomic::{AtomicU64, AtomicUsize},
7 },
8};
9
10use anyhow::{Context as _, Result, anyhow, bail};
11use collections::HashMap;
12use fs::{Fs, copy_recursive};
13use futures::{FutureExt, future::Shared};
14use gpui::{
15 App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Global, Task,
16 WeakEntity,
17};
18use itertools::Either;
19use postage::{prelude::Stream as _, watch};
20use rpc::{
21 AnyProtoClient, ErrorExt, TypedEnvelope,
22 proto::{self, REMOTE_SERVER_PROJECT_ID},
23};
24use text::ReplicaId;
25use util::{
26 ResultExt,
27 paths::{PathStyle, RemotePathBuf, SanitizedPath},
28 rel_path::RelPath,
29};
30use worktree::{
31 CreatedEntry, Entry, ProjectEntryId, UpdatedEntriesSet, UpdatedGitRepositoriesSet, Worktree,
32 WorktreeId,
33};
34
35use crate::{ProjectPath, trusted_worktrees::TrustedWorktrees};
36
37enum WorktreeStoreState {
38 Local {
39 fs: Arc<dyn Fs>,
40 },
41 Remote {
42 upstream_client: AnyProtoClient,
43 upstream_project_id: u64,
44 path_style: PathStyle,
45 },
46}
47
48#[derive(Clone)]
49pub struct WorktreeIdCounter(Arc<AtomicU64>);
50
51impl Default for WorktreeIdCounter {
52 fn default() -> Self {
53 Self(Arc::new(AtomicU64::new(1)))
54 }
55}
56
57impl WorktreeIdCounter {
58 pub fn get(cx: &mut App) -> Self {
59 cx.default_global::<Self>().clone()
60 }
61
62 fn next(&self) -> u64 {
63 self.0.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
64 }
65}
66
67impl Global for WorktreeIdCounter {}
68
69pub struct WorktreeStore {
70 next_entry_id: Arc<AtomicUsize>,
71 next_worktree_id: WorktreeIdCounter,
72 downstream_client: Option<(AnyProtoClient, u64)>,
73 retain_worktrees: bool,
74 worktrees: Vec<WorktreeHandle>,
75 worktrees_reordered: bool,
76 scanning_enabled: bool,
77 #[allow(clippy::type_complexity)]
78 loading_worktrees:
79 HashMap<Arc<SanitizedPath>, Shared<Task<Result<Entity<Worktree>, Arc<anyhow::Error>>>>>,
80 initial_scan_complete: (watch::Sender<bool>, watch::Receiver<bool>),
81 state: WorktreeStoreState,
82}
83
84#[derive(Debug)]
85pub enum WorktreeStoreEvent {
86 WorktreeAdded(Entity<Worktree>),
87 WorktreeRemoved(EntityId, WorktreeId),
88 WorktreeReleased(EntityId, WorktreeId),
89 WorktreeOrderChanged,
90 WorktreeUpdateSent(Entity<Worktree>),
91 WorktreeUpdatedEntries(WorktreeId, UpdatedEntriesSet),
92 WorktreeUpdatedGitRepositories(WorktreeId, UpdatedGitRepositoriesSet),
93 WorktreeDeletedEntry(WorktreeId, ProjectEntryId),
94 WorktreeUpdatedRootRepoCommonDir(WorktreeId),
95}
96
97impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
98
99impl WorktreeStore {
100 pub fn init(client: &AnyProtoClient) {
101 client.add_entity_request_handler(Self::handle_create_project_entry);
102 client.add_entity_request_handler(Self::handle_copy_project_entry);
103 client.add_entity_request_handler(Self::handle_delete_project_entry);
104 client.add_entity_request_handler(Self::handle_expand_project_entry);
105 client.add_entity_request_handler(Self::handle_expand_all_for_project_entry);
106 }
107
108 pub fn init_remote(client: &AnyProtoClient) {
109 client.add_entity_request_handler(Self::handle_allocate_worktree_id);
110 }
111
112 pub fn local(
113 retain_worktrees: bool,
114 fs: Arc<dyn Fs>,
115 next_worktree_id: WorktreeIdCounter,
116 ) -> Self {
117 Self {
118 next_entry_id: Default::default(),
119 next_worktree_id,
120 loading_worktrees: Default::default(),
121 downstream_client: None,
122 worktrees: Vec::new(),
123 worktrees_reordered: false,
124 scanning_enabled: true,
125 retain_worktrees,
126 initial_scan_complete: watch::channel_with(true),
127 state: WorktreeStoreState::Local { fs },
128 }
129 }
130
131 pub fn remote(
132 retain_worktrees: bool,
133 upstream_client: AnyProtoClient,
134 upstream_project_id: u64,
135 path_style: PathStyle,
136 next_worktree_id: WorktreeIdCounter,
137 ) -> Self {
138 Self {
139 next_entry_id: Default::default(),
140 next_worktree_id,
141 loading_worktrees: Default::default(),
142 downstream_client: None,
143 worktrees: Vec::new(),
144 worktrees_reordered: false,
145 scanning_enabled: true,
146 retain_worktrees,
147 initial_scan_complete: watch::channel_with(true),
148 state: WorktreeStoreState::Remote {
149 upstream_client,
150 upstream_project_id,
151 path_style,
152 },
153 }
154 }
155
156 pub fn next_worktree_id(&self) -> impl Future<Output = Result<WorktreeId>> + use<> {
157 let strategy = match (&self.state, &self.downstream_client) {
158 // we are a remote server, the client is in charge of assigning worktree ids
159 (WorktreeStoreState::Local { .. }, Some((client, REMOTE_SERVER_PROJECT_ID))) => {
160 Either::Left(client.clone())
161 }
162 // we are just a local zed project, we can assign ids
163 (WorktreeStoreState::Local { .. }, _) => Either::Right(self.next_worktree_id.next()),
164 // we are connected to a remote server, we are in charge of assigning worktree ids
165 (WorktreeStoreState::Remote { .. }, _) => Either::Right(self.next_worktree_id.next()),
166 };
167 async move {
168 match strategy {
169 Either::Left(client) => Ok(client
170 .request(proto::AllocateWorktreeId {
171 project_id: REMOTE_SERVER_PROJECT_ID,
172 })
173 .await?
174 .worktree_id),
175 Either::Right(id) => Ok(id),
176 }
177 .map(WorktreeId::from_proto)
178 }
179 }
180
181 pub fn disable_scanner(&mut self) {
182 self.scanning_enabled = false;
183 *self.initial_scan_complete.0.borrow_mut() = true;
184 }
185
186 /// Returns a future that resolves when all visible worktrees have completed
187 /// their initial scan (entries populated, git repos detected).
188 pub fn wait_for_initial_scan(&self) -> impl Future<Output = ()> + use<> {
189 let mut rx = self.initial_scan_complete.1.clone();
190 async move {
191 let mut done = *rx.borrow();
192 while !done {
193 if let Some(value) = rx.recv().await {
194 done = value;
195 } else {
196 break;
197 }
198 }
199 }
200 }
201
202 /// Returns whether all visible worktrees have completed their initial scan.
203 pub fn initial_scan_completed(&self) -> bool {
204 *self.initial_scan_complete.1.borrow()
205 }
206
207 /// Checks whether all visible worktrees have completed their initial scan
208 /// and no worktree creations are pending, and updates the watch channel accordingly.
209 fn update_initial_scan_state(&mut self, cx: &App) {
210 let complete = self.loading_worktrees.is_empty()
211 && self
212 .visible_worktrees(cx)
213 .all(|wt| wt.read(cx).completed_scan_id() >= 1);
214 *self.initial_scan_complete.0.borrow_mut() = complete;
215 }
216
217 /// Spawns a detached task that waits for a worktree's initial scan to complete,
218 /// then rechecks and updates the aggregate initial scan state.
219 fn observe_worktree_scan_completion(
220 &mut self,
221 worktree: &Entity<Worktree>,
222 cx: &mut Context<Self>,
223 ) {
224 let await_scan = worktree.update(cx, |worktree, _cx| worktree.wait_for_snapshot(1));
225 cx.spawn(async move |this, cx| {
226 await_scan.await.ok();
227 this.update(cx, |this, cx| {
228 this.update_initial_scan_state(cx);
229 })
230 .ok();
231 anyhow::Ok(())
232 })
233 .detach();
234 }
235
236 /// Iterates through all worktrees, including ones that don't appear in the project panel
237 pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Entity<Worktree>> {
238 self.worktrees
239 .iter()
240 .filter_map(move |worktree| worktree.upgrade())
241 }
242
243 /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
244 pub fn visible_worktrees<'a>(
245 &'a self,
246 cx: &'a App,
247 ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
248 self.worktrees()
249 .filter(|worktree| worktree.read(cx).is_visible())
250 }
251
252 /// Iterates through all user-visible worktrees (directories and files that appear in the project panel) and other, invisible single files that could appear e.g. due to drag and drop.
253 pub fn visible_worktrees_and_single_files<'a>(
254 &'a self,
255 cx: &'a App,
256 ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
257 self.worktrees()
258 .filter(|worktree| worktree.read(cx).is_visible() || worktree.read(cx).is_single_file())
259 }
260
261 pub fn worktree_for_id(&self, id: WorktreeId, cx: &App) -> Option<Entity<Worktree>> {
262 self.worktrees()
263 .find(|worktree| worktree.read(cx).id() == id)
264 }
265
266 pub fn worktree_for_entry(
267 &self,
268 entry_id: ProjectEntryId,
269 cx: &App,
270 ) -> Option<Entity<Worktree>> {
271 self.worktrees()
272 .find(|worktree| worktree.read(cx).contains_entry(entry_id))
273 }
274
275 pub fn find_worktree(
276 &self,
277 abs_path: impl AsRef<Path>,
278 cx: &App,
279 ) -> Option<(Entity<Worktree>, Arc<RelPath>)> {
280 let abs_path = SanitizedPath::new(abs_path.as_ref());
281 for tree in self.worktrees() {
282 let path_style = tree.read(cx).path_style();
283 if let Some(relative_path) =
284 path_style.strip_prefix(abs_path.as_ref(), tree.read(cx).abs_path().as_ref())
285 {
286 return Some((tree.clone(), relative_path.into_arc()));
287 }
288 }
289 None
290 }
291
292 pub fn project_path_for_absolute_path(&self, abs_path: &Path, cx: &App) -> Option<ProjectPath> {
293 self.find_worktree(abs_path, cx)
294 .map(|(worktree, relative_path)| ProjectPath {
295 worktree_id: worktree.read(cx).id(),
296 path: relative_path,
297 })
298 }
299
300 pub fn absolutize(&self, project_path: &ProjectPath, cx: &App) -> Option<PathBuf> {
301 let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
302 Some(worktree.read(cx).absolutize(&project_path.path))
303 }
304
305 pub fn path_style(&self) -> PathStyle {
306 match &self.state {
307 WorktreeStoreState::Local { .. } => PathStyle::local(),
308 WorktreeStoreState::Remote { path_style, .. } => *path_style,
309 }
310 }
311
312 pub fn find_or_create_worktree(
313 &mut self,
314 abs_path: impl AsRef<Path>,
315 visible: bool,
316 cx: &mut Context<Self>,
317 ) -> Task<Result<(Entity<Worktree>, Arc<RelPath>)>> {
318 let abs_path = abs_path.as_ref();
319 if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
320 Task::ready(Ok((tree, relative_path)))
321 } else {
322 let worktree = self.create_worktree(abs_path, visible, cx);
323 cx.background_spawn(async move { Ok((worktree.await?, RelPath::empty().into())) })
324 }
325 }
326
327 pub fn entry_for_id<'a>(&'a self, entry_id: ProjectEntryId, cx: &'a App) -> Option<&'a Entry> {
328 self.worktrees()
329 .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
330 }
331
332 pub fn worktree_and_entry_for_id<'a>(
333 &'a self,
334 entry_id: ProjectEntryId,
335 cx: &'a App,
336 ) -> Option<(Entity<Worktree>, &'a Entry)> {
337 self.worktrees().find_map(|worktree| {
338 worktree
339 .read(cx)
340 .entry_for_id(entry_id)
341 .map(|e| (worktree.clone(), e))
342 })
343 }
344
345 pub fn entry_for_path<'a>(&'a self, path: &ProjectPath, cx: &'a App) -> Option<&'a Entry> {
346 self.worktree_for_id(path.worktree_id, cx)?
347 .read(cx)
348 .entry_for_path(&path.path)
349 }
350
351 pub fn copy_entry(
352 &mut self,
353 entry_id: ProjectEntryId,
354 new_project_path: ProjectPath,
355 cx: &mut Context<Self>,
356 ) -> Task<Result<Option<Entry>>> {
357 let Some(old_worktree) = self.worktree_for_entry(entry_id, cx) else {
358 return Task::ready(Err(anyhow!("no such worktree")));
359 };
360 let Some(old_entry) = old_worktree.read(cx).entry_for_id(entry_id) else {
361 return Task::ready(Err(anyhow!("no such entry")));
362 };
363 let Some(new_worktree) = self.worktree_for_id(new_project_path.worktree_id, cx) else {
364 return Task::ready(Err(anyhow!("no such worktree")));
365 };
366
367 match &self.state {
368 WorktreeStoreState::Local { fs } => {
369 let old_abs_path = old_worktree.read(cx).absolutize(&old_entry.path);
370 let new_abs_path = new_worktree.read(cx).absolutize(&new_project_path.path);
371 let fs = fs.clone();
372 let copy = cx.background_spawn(async move {
373 copy_recursive(
374 fs.as_ref(),
375 &old_abs_path,
376 &new_abs_path,
377 Default::default(),
378 )
379 .await
380 });
381
382 cx.spawn(async move |_, cx| {
383 copy.await?;
384 new_worktree
385 .update(cx, |this, cx| {
386 this.as_local_mut().unwrap().refresh_entry(
387 new_project_path.path,
388 None,
389 cx,
390 )
391 })
392 .await
393 })
394 }
395 WorktreeStoreState::Remote {
396 upstream_client,
397 upstream_project_id,
398 ..
399 } => {
400 let response = upstream_client.request(proto::CopyProjectEntry {
401 project_id: *upstream_project_id,
402 entry_id: entry_id.to_proto(),
403 new_path: new_project_path.path.to_proto(),
404 new_worktree_id: new_project_path.worktree_id.to_proto(),
405 });
406 cx.spawn(async move |_, cx| {
407 let response = response.await?;
408 match response.entry {
409 Some(entry) => new_worktree
410 .update(cx, |worktree, cx| {
411 worktree.as_remote_mut().unwrap().insert_entry(
412 entry,
413 response.worktree_scan_id as usize,
414 cx,
415 )
416 })
417 .await
418 .map(Some),
419 None => Ok(None),
420 }
421 })
422 }
423 }
424 }
425
426 pub fn rename_entry(
427 &mut self,
428 entry_id: ProjectEntryId,
429 new_project_path: ProjectPath,
430 cx: &mut Context<Self>,
431 ) -> Task<Result<CreatedEntry>> {
432 let Some(old_worktree) = self.worktree_for_entry(entry_id, cx) else {
433 return Task::ready(Err(anyhow!("no such worktree")));
434 };
435 let Some(old_entry) = old_worktree.read(cx).entry_for_id(entry_id).cloned() else {
436 return Task::ready(Err(anyhow!("no such entry")));
437 };
438 let Some(new_worktree) = self.worktree_for_id(new_project_path.worktree_id, cx) else {
439 return Task::ready(Err(anyhow!("no such worktree")));
440 };
441
442 match &self.state {
443 WorktreeStoreState::Local { fs } => {
444 let abs_old_path = old_worktree.read(cx).absolutize(&old_entry.path);
445 let new_worktree_ref = new_worktree.read(cx);
446 let is_root_entry = new_worktree_ref
447 .root_entry()
448 .is_some_and(|e| e.id == entry_id);
449 let abs_new_path = if is_root_entry {
450 let abs_path = new_worktree_ref.abs_path();
451 let Some(root_parent_path) = abs_path.parent() else {
452 return Task::ready(Err(anyhow!("no parent for path {:?}", abs_path)));
453 };
454 root_parent_path.join(new_project_path.path.as_std_path())
455 } else {
456 new_worktree_ref.absolutize(&new_project_path.path)
457 };
458
459 let fs = fs.clone();
460 let case_sensitive = new_worktree
461 .read(cx)
462 .as_local()
463 .unwrap()
464 .fs_is_case_sensitive();
465
466 let do_rename =
467 async move |fs: &dyn Fs, old_path: &Path, new_path: &Path, overwrite| {
468 fs.rename(
469 &old_path,
470 &new_path,
471 fs::RenameOptions {
472 overwrite,
473 ..fs::RenameOptions::default()
474 },
475 )
476 .await
477 .with_context(|| format!("renaming {old_path:?} into {new_path:?}"))
478 };
479
480 let rename = cx.background_spawn({
481 let abs_new_path = abs_new_path.clone();
482 async move {
483 // If we're on a case-insensitive FS and we're doing a case-only rename (i.e. `foobar` to `FOOBAR`)
484 // we want to overwrite, because otherwise we run into a file-already-exists error.
485 let overwrite = !case_sensitive
486 && abs_old_path != abs_new_path
487 && abs_old_path.to_str().map(|p| p.to_lowercase())
488 == abs_new_path.to_str().map(|p| p.to_lowercase());
489
490 // The directory we're renaming into might not exist yet
491 if let Err(e) =
492 do_rename(fs.as_ref(), &abs_old_path, &abs_new_path, overwrite).await
493 {
494 if let Some(err) = e.downcast_ref::<std::io::Error>()
495 && err.kind() == std::io::ErrorKind::NotFound
496 {
497 if let Some(parent) = abs_new_path.parent() {
498 fs.create_dir(parent).await.with_context(|| {
499 format!("creating parent directory {parent:?}")
500 })?;
501 return do_rename(
502 fs.as_ref(),
503 &abs_old_path,
504 &abs_new_path,
505 overwrite,
506 )
507 .await;
508 }
509 }
510 return Err(e);
511 }
512 Ok(())
513 }
514 });
515
516 cx.spawn(async move |_, cx| {
517 rename.await?;
518 Ok(new_worktree
519 .update(cx, |this, cx| {
520 let local = this.as_local_mut().unwrap();
521 if is_root_entry {
522 // We eagerly update `abs_path` and refresh this worktree.
523 // Otherwise, the FS watcher would do it on the `RootUpdated` event,
524 // but with a noticeable delay, so we handle it proactively.
525 local.update_abs_path_and_refresh(
526 SanitizedPath::new_arc(&abs_new_path),
527 cx,
528 );
529 Task::ready(Ok(this.root_entry().cloned()))
530 } else {
531 // First refresh the parent directory (in case it was newly created)
532 if let Some(parent) = new_project_path.path.parent() {
533 let _ = local.refresh_entries_for_paths(vec![parent.into()]);
534 }
535 // Then refresh the new path
536 local.refresh_entry(
537 new_project_path.path.clone(),
538 Some(old_entry.path),
539 cx,
540 )
541 }
542 })
543 .await?
544 .map(CreatedEntry::Included)
545 .unwrap_or_else(|| CreatedEntry::Excluded {
546 abs_path: abs_new_path,
547 }))
548 })
549 }
550 WorktreeStoreState::Remote {
551 upstream_client,
552 upstream_project_id,
553 ..
554 } => {
555 let response = upstream_client.request(proto::RenameProjectEntry {
556 project_id: *upstream_project_id,
557 entry_id: entry_id.to_proto(),
558 new_path: new_project_path.path.to_proto(),
559 new_worktree_id: new_project_path.worktree_id.to_proto(),
560 });
561 cx.spawn(async move |_, cx| {
562 let response = response.await?;
563 match response.entry {
564 Some(entry) => new_worktree
565 .update(cx, |worktree, cx| {
566 worktree.as_remote_mut().unwrap().insert_entry(
567 entry,
568 response.worktree_scan_id as usize,
569 cx,
570 )
571 })
572 .await
573 .map(CreatedEntry::Included),
574 None => {
575 let abs_path = new_worktree.read_with(cx, |worktree, _| {
576 worktree.absolutize(&new_project_path.path)
577 });
578 Ok(CreatedEntry::Excluded { abs_path })
579 }
580 }
581 })
582 }
583 }
584 }
585 pub fn create_worktree(
586 &mut self,
587 abs_path: impl AsRef<Path>,
588 visible: bool,
589 cx: &mut Context<Self>,
590 ) -> Task<Result<Entity<Worktree>>> {
591 let abs_path: Arc<SanitizedPath> = SanitizedPath::new_arc(&abs_path);
592 let is_via_collab = matches!(&self.state, WorktreeStoreState::Remote { upstream_client, .. } if upstream_client.is_via_collab());
593 if !self.loading_worktrees.contains_key(&abs_path) {
594 let task = match &self.state {
595 WorktreeStoreState::Remote {
596 upstream_client,
597 path_style,
598 ..
599 } => {
600 if upstream_client.is_via_collab() {
601 Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
602 } else {
603 let abs_path = RemotePathBuf::new(abs_path.to_string(), *path_style);
604 self.create_remote_worktree(upstream_client.clone(), abs_path, visible, cx)
605 }
606 }
607 WorktreeStoreState::Local { fs } => {
608 self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
609 }
610 };
611
612 self.loading_worktrees
613 .insert(abs_path.clone(), task.shared());
614
615 if visible && self.scanning_enabled {
616 *self.initial_scan_complete.0.borrow_mut() = false;
617 }
618 }
619 let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
620 cx.spawn(async move |this, cx| {
621 let result = task.await;
622 this.update(cx, |this, cx| {
623 this.loading_worktrees.remove(&abs_path);
624 if !visible || !this.scanning_enabled || result.is_err() {
625 this.update_initial_scan_state(cx);
626 }
627 })
628 .ok();
629
630 match result {
631 Ok(worktree) => {
632 if !is_via_collab {
633 if let Some((trusted_worktrees, worktree_store)) = this
634 .update(cx, |_, cx| {
635 TrustedWorktrees::try_get_global(cx).zip(Some(cx.entity()))
636 })
637 .ok()
638 .flatten()
639 {
640 trusted_worktrees.update(cx, |trusted_worktrees, cx| {
641 trusted_worktrees.can_trust(
642 &worktree_store,
643 worktree.read(cx).id(),
644 cx,
645 );
646 });
647 }
648
649 this.update(cx, |this, cx| {
650 if this.scanning_enabled && visible {
651 this.observe_worktree_scan_completion(&worktree, cx);
652 }
653 })
654 .ok();
655 }
656 Ok(worktree)
657 }
658 Err(err) => Err((*err).cloned()),
659 }
660 })
661 }
662
663 fn create_remote_worktree(
664 &mut self,
665 client: AnyProtoClient,
666 abs_path: RemotePathBuf,
667 visible: bool,
668 cx: &mut Context<Self>,
669 ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
670 let path_style = abs_path.path_style();
671 let mut abs_path = abs_path.to_string();
672 // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
673 // in which case want to strip the leading the `/`.
674 // On the host-side, the `~` will get expanded.
675 // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
676 if abs_path.starts_with("/~") {
677 abs_path = abs_path[1..].to_string();
678 }
679 if abs_path.is_empty() {
680 abs_path = "~/".to_string();
681 }
682
683 cx.spawn(async move |this, cx| {
684 let this = this.upgrade().context("Dropped worktree store")?;
685
686 let path = RemotePathBuf::new(abs_path, path_style);
687 let response = client
688 .request(proto::AddWorktree {
689 project_id: REMOTE_SERVER_PROJECT_ID,
690 path: path.to_proto(),
691 visible,
692 })
693 .await?;
694
695 if let Some(existing_worktree) = this.read_with(cx, |this, cx| {
696 this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
697 }) {
698 return Ok(existing_worktree);
699 }
700
701 let root_path_buf = PathBuf::from(response.canonicalized_path.clone());
702 let root_name = root_path_buf
703 .file_name()
704 .map(|n| n.to_string_lossy().into_owned())
705 .unwrap_or(root_path_buf.to_string_lossy().into_owned());
706
707 let worktree = cx.update(|cx| {
708 Worktree::remote(
709 REMOTE_SERVER_PROJECT_ID,
710 ReplicaId::REMOTE_SERVER,
711 proto::WorktreeMetadata {
712 id: response.worktree_id,
713 root_name,
714 visible,
715 abs_path: response.canonicalized_path,
716 root_repo_common_dir: response.root_repo_common_dir,
717 },
718 client,
719 path_style,
720 cx,
721 )
722 });
723
724 this.update(cx, |this, cx| {
725 this.add(&worktree, cx);
726 });
727 Ok(worktree)
728 })
729 }
730
731 fn create_local_worktree(
732 &mut self,
733 fs: Arc<dyn Fs>,
734 abs_path: Arc<SanitizedPath>,
735 visible: bool,
736 cx: &mut Context<Self>,
737 ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
738 let next_entry_id = self.next_entry_id.clone();
739 let scanning_enabled = self.scanning_enabled;
740
741 let next_worktree_id = self.next_worktree_id();
742
743 cx.spawn(async move |this, cx| {
744 let worktree_id = next_worktree_id.await?;
745 let worktree = Worktree::local(
746 SanitizedPath::cast_arc(abs_path.clone()),
747 visible,
748 fs,
749 next_entry_id,
750 scanning_enabled,
751 worktree_id,
752 cx,
753 )
754 .await?;
755
756 this.update(cx, |this, cx| this.add(&worktree, cx))?;
757
758 if visible {
759 cx.update(|cx| {
760 cx.add_recent_document(abs_path.as_path());
761 });
762 }
763
764 Ok(worktree)
765 })
766 }
767
768 pub fn add(&mut self, worktree: &Entity<Worktree>, cx: &mut Context<Self>) {
769 let worktree_id = worktree.read(cx).id();
770 debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
771
772 let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
773 let handle = if push_strong_handle {
774 WorktreeHandle::Strong(worktree.clone())
775 } else {
776 WorktreeHandle::Weak(worktree.downgrade())
777 };
778 if self.worktrees_reordered {
779 self.worktrees.push(handle);
780 } else {
781 let i = match self
782 .worktrees
783 .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
784 other.upgrade().map(|worktree| worktree.read(cx).abs_path())
785 }) {
786 Ok(i) | Err(i) => i,
787 };
788 self.worktrees.insert(i, handle);
789 }
790
791 cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
792 self.send_project_updates(cx);
793
794 let handle_id = worktree.entity_id();
795 cx.subscribe(worktree, |_, worktree, event, cx| {
796 let worktree_id = worktree.read(cx).id();
797 match event {
798 worktree::Event::UpdatedEntries(changes) => {
799 cx.emit(WorktreeStoreEvent::WorktreeUpdatedEntries(
800 worktree_id,
801 changes.clone(),
802 ));
803 }
804 worktree::Event::UpdatedGitRepositories(set) => {
805 cx.emit(WorktreeStoreEvent::WorktreeUpdatedGitRepositories(
806 worktree_id,
807 set.clone(),
808 ));
809 }
810 worktree::Event::DeletedEntry(id) => {
811 cx.emit(WorktreeStoreEvent::WorktreeDeletedEntry(worktree_id, *id))
812 }
813 worktree::Event::Deleted => {
814 // The worktree root itself has been deleted (for single-file worktrees)
815 // The worktree will be removed via the observe_release callback
816 }
817 worktree::Event::UpdatedRootRepoCommonDir => {
818 cx.emit(WorktreeStoreEvent::WorktreeUpdatedRootRepoCommonDir(
819 worktree_id,
820 ));
821 }
822 }
823 })
824 .detach();
825 cx.observe_release(worktree, move |this, worktree, cx| {
826 cx.emit(WorktreeStoreEvent::WorktreeReleased(
827 handle_id,
828 worktree.id(),
829 ));
830 cx.emit(WorktreeStoreEvent::WorktreeRemoved(
831 handle_id,
832 worktree.id(),
833 ));
834 this.send_project_updates(cx);
835 })
836 .detach();
837 }
838
839 pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut Context<Self>) {
840 self.worktrees.retain(|worktree| {
841 if let Some(worktree) = worktree.upgrade() {
842 if worktree.read(cx).id() == id_to_remove {
843 cx.emit(WorktreeStoreEvent::WorktreeRemoved(
844 worktree.entity_id(),
845 id_to_remove,
846 ));
847 false
848 } else {
849 true
850 }
851 } else {
852 false
853 }
854 });
855 self.update_initial_scan_state(cx);
856 self.send_project_updates(cx);
857 }
858
859 pub fn worktree_for_main_worktree_path(
860 &self,
861 path: &Path,
862 cx: &App,
863 ) -> Option<Entity<Worktree>> {
864 self.visible_worktrees(cx).find(|worktree| {
865 let worktree = worktree.read(cx);
866 if let Some(common_dir) = worktree.root_repo_common_dir() {
867 common_dir.parent() == Some(path)
868 } else {
869 worktree.abs_path().as_ref() == path
870 }
871 })
872 }
873
874 pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
875 self.worktrees_reordered = worktrees_reordered;
876 }
877
878 fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
879 match &self.state {
880 WorktreeStoreState::Remote {
881 upstream_client,
882 upstream_project_id,
883 ..
884 } => Some((upstream_client.clone(), *upstream_project_id)),
885 WorktreeStoreState::Local { .. } => None,
886 }
887 }
888
889 pub fn set_worktrees_from_proto(
890 &mut self,
891 worktrees: Vec<proto::WorktreeMetadata>,
892 replica_id: ReplicaId,
893 cx: &mut Context<Self>,
894 ) -> Result<()> {
895 let mut old_worktrees_by_id = self
896 .worktrees
897 .drain(..)
898 .filter_map(|worktree| {
899 let worktree = worktree.upgrade()?;
900 Some((worktree.read(cx).id(), worktree))
901 })
902 .collect::<HashMap<_, _>>();
903
904 let (client, project_id) = self.upstream_client().context("invalid project")?;
905
906 for worktree in worktrees {
907 if let Some(old_worktree) =
908 old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
909 {
910 let push_strong_handle =
911 self.retain_worktrees || old_worktree.read(cx).is_visible();
912 let handle = if push_strong_handle {
913 WorktreeHandle::Strong(old_worktree.clone())
914 } else {
915 WorktreeHandle::Weak(old_worktree.downgrade())
916 };
917 self.worktrees.push(handle);
918 } else {
919 self.add(
920 &Worktree::remote(
921 project_id,
922 replica_id,
923 worktree,
924 client.clone(),
925 self.path_style(),
926 cx,
927 ),
928 cx,
929 );
930 }
931 }
932 self.send_project_updates(cx);
933
934 Ok(())
935 }
936
937 pub fn move_worktree(
938 &mut self,
939 source: WorktreeId,
940 destination: WorktreeId,
941 cx: &mut Context<Self>,
942 ) -> Result<()> {
943 if source == destination {
944 return Ok(());
945 }
946
947 let mut source_index = None;
948 let mut destination_index = None;
949 for (i, worktree) in self.worktrees.iter().enumerate() {
950 if let Some(worktree) = worktree.upgrade() {
951 let worktree_id = worktree.read(cx).id();
952 if worktree_id == source {
953 source_index = Some(i);
954 if destination_index.is_some() {
955 break;
956 }
957 } else if worktree_id == destination {
958 destination_index = Some(i);
959 if source_index.is_some() {
960 break;
961 }
962 }
963 }
964 }
965
966 let source_index =
967 source_index.with_context(|| format!("Missing worktree for id {source}"))?;
968 let destination_index =
969 destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
970
971 if source_index == destination_index {
972 return Ok(());
973 }
974
975 let worktree_to_move = self.worktrees.remove(source_index);
976 self.worktrees.insert(destination_index, worktree_to_move);
977 self.worktrees_reordered = true;
978 cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
979 cx.notify();
980 Ok(())
981 }
982
983 pub fn disconnected_from_host(&mut self, cx: &mut App) {
984 for worktree in &self.worktrees {
985 if let Some(worktree) = worktree.upgrade() {
986 worktree.update(cx, |worktree, _| {
987 if let Some(worktree) = worktree.as_remote_mut() {
988 worktree.disconnected_from_host();
989 }
990 });
991 }
992 }
993 }
994
995 pub fn send_project_updates(&mut self, cx: &mut Context<Self>) {
996 let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
997 return;
998 };
999
1000 let update = proto::UpdateProject {
1001 project_id,
1002 worktrees: self.worktree_metadata_protos(cx),
1003 };
1004
1005 // collab has bad concurrency guarantees, so we send requests in serial.
1006 let update_project = if downstream_client.is_via_collab() {
1007 Some(downstream_client.request(update))
1008 } else {
1009 downstream_client.send(update).log_err();
1010 None
1011 };
1012 cx.spawn(async move |this, cx| {
1013 if let Some(update_project) = update_project {
1014 update_project.await?;
1015 }
1016
1017 this.update(cx, |this, cx| {
1018 let worktrees = this.worktrees().collect::<Vec<_>>();
1019
1020 for worktree in worktrees {
1021 worktree.update(cx, |worktree, cx| {
1022 let client = downstream_client.clone();
1023 worktree.observe_updates(project_id, cx, {
1024 move |update| {
1025 let client = client.clone();
1026 async move {
1027 if client.is_via_collab() {
1028 client
1029 .request(update)
1030 .map(|result| result.log_err().is_some())
1031 .await
1032 } else {
1033 client.send(update).log_err().is_some()
1034 }
1035 }
1036 }
1037 });
1038 });
1039
1040 cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
1041 }
1042
1043 anyhow::Ok(())
1044 })
1045 })
1046 .detach_and_log_err(cx);
1047 }
1048
1049 pub fn worktree_metadata_protos(&self, cx: &App) -> Vec<proto::WorktreeMetadata> {
1050 self.worktrees()
1051 .map(|worktree| {
1052 let worktree = worktree.read(cx);
1053 proto::WorktreeMetadata {
1054 id: worktree.id().to_proto(),
1055 root_name: worktree.root_name_str().to_owned(),
1056 visible: worktree.is_visible(),
1057 abs_path: worktree.abs_path().to_string_lossy().into_owned(),
1058 root_repo_common_dir: worktree
1059 .root_repo_common_dir()
1060 .map(|p| p.to_string_lossy().into_owned()),
1061 }
1062 })
1063 .collect()
1064 }
1065
1066 pub fn shared(
1067 &mut self,
1068 remote_id: u64,
1069 downstream_client: AnyProtoClient,
1070 cx: &mut Context<Self>,
1071 ) {
1072 self.retain_worktrees = true;
1073 self.downstream_client = Some((downstream_client, remote_id));
1074
1075 // When shared, retain all worktrees
1076 for worktree_handle in self.worktrees.iter_mut() {
1077 match worktree_handle {
1078 WorktreeHandle::Strong(_) => {}
1079 WorktreeHandle::Weak(worktree) => {
1080 if let Some(worktree) = worktree.upgrade() {
1081 *worktree_handle = WorktreeHandle::Strong(worktree);
1082 }
1083 }
1084 }
1085 }
1086 // Only send project updates if we share in a collaborative mode.
1087 // Otherwise we are the remote server which is currently constructing
1088 // worktree store before the client actually has set up its message
1089 // handlers.
1090 if remote_id != REMOTE_SERVER_PROJECT_ID {
1091 self.send_project_updates(cx);
1092 }
1093 }
1094
1095 pub fn unshared(&mut self, cx: &mut Context<Self>) {
1096 self.retain_worktrees = false;
1097 self.downstream_client.take();
1098
1099 // When not shared, only retain the visible worktrees
1100 for worktree_handle in self.worktrees.iter_mut() {
1101 if let WorktreeHandle::Strong(worktree) = worktree_handle {
1102 let is_visible = worktree.update(cx, |worktree, _| {
1103 worktree.stop_observing_updates();
1104 worktree.is_visible()
1105 });
1106 if !is_visible {
1107 *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
1108 }
1109 }
1110 }
1111 }
1112
1113 pub async fn handle_create_project_entry(
1114 this: Entity<Self>,
1115 envelope: TypedEnvelope<proto::CreateProjectEntry>,
1116 mut cx: AsyncApp,
1117 ) -> Result<proto::ProjectEntryResponse> {
1118 let worktree = this.update(&mut cx, |this, cx| {
1119 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
1120 this.worktree_for_id(worktree_id, cx)
1121 .context("worktree not found")
1122 })?;
1123 Worktree::handle_create_entry(worktree, envelope.payload, cx).await
1124 }
1125
1126 pub async fn handle_copy_project_entry(
1127 this: Entity<Self>,
1128 envelope: TypedEnvelope<proto::CopyProjectEntry>,
1129 mut cx: AsyncApp,
1130 ) -> Result<proto::ProjectEntryResponse> {
1131 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1132 let new_worktree_id = WorktreeId::from_proto(envelope.payload.new_worktree_id);
1133 let new_project_path = (
1134 new_worktree_id,
1135 RelPath::from_proto(&envelope.payload.new_path)?,
1136 );
1137 let (scan_id, entry) = this.update(&mut cx, |this, cx| {
1138 let Some((_, project_id)) = this.downstream_client else {
1139 bail!("no downstream client")
1140 };
1141 let Some(entry) = this.entry_for_id(entry_id, cx) else {
1142 bail!("no such entry");
1143 };
1144 if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1145 bail!("entry is private")
1146 }
1147
1148 let new_worktree = this
1149 .worktree_for_id(new_worktree_id, cx)
1150 .context("no such worktree")?;
1151 let scan_id = new_worktree.read(cx).scan_id();
1152 anyhow::Ok((
1153 scan_id,
1154 this.copy_entry(entry_id, new_project_path.into(), cx),
1155 ))
1156 })?;
1157 let entry = entry.await?;
1158 Ok(proto::ProjectEntryResponse {
1159 entry: entry.as_ref().map(|entry| entry.into()),
1160 worktree_scan_id: scan_id as u64,
1161 trashed_entry: None,
1162 })
1163 }
1164
1165 pub async fn handle_delete_project_entry(
1166 this: Entity<Self>,
1167 envelope: TypedEnvelope<proto::DeleteProjectEntry>,
1168 mut cx: AsyncApp,
1169 ) -> Result<proto::ProjectEntryResponse> {
1170 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1171 let worktree = this.update(&mut cx, |this, cx| {
1172 let Some((_, project_id)) = this.downstream_client else {
1173 bail!("no downstream client")
1174 };
1175 let Some(entry) = this.entry_for_id(entry_id, cx) else {
1176 bail!("no entry")
1177 };
1178 if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1179 bail!("entry is private")
1180 }
1181 this.worktree_for_entry(entry_id, cx)
1182 .context("worktree not found")
1183 })?;
1184 Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
1185 }
1186
1187 pub async fn handle_rename_project_entry(
1188 this: Entity<Self>,
1189 request: proto::RenameProjectEntry,
1190 mut cx: AsyncApp,
1191 ) -> Result<proto::ProjectEntryResponse> {
1192 let entry_id = ProjectEntryId::from_proto(request.entry_id);
1193 let new_worktree_id = WorktreeId::from_proto(request.new_worktree_id);
1194 let rel_path = RelPath::from_proto(&request.new_path)
1195 .with_context(|| format!("received invalid relative path {:?}", &request.new_path))?;
1196
1197 let (scan_id, task) = this.update(&mut cx, |this, cx| {
1198 let worktree = this
1199 .worktree_for_entry(entry_id, cx)
1200 .context("no such worktree")?;
1201
1202 let Some((_, project_id)) = this.downstream_client else {
1203 bail!("no downstream client")
1204 };
1205 let entry = worktree
1206 .read(cx)
1207 .entry_for_id(entry_id)
1208 .ok_or_else(|| anyhow!("missing entry"))?;
1209 if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1210 bail!("entry is private")
1211 }
1212
1213 let scan_id = worktree.read(cx).scan_id();
1214 anyhow::Ok((
1215 scan_id,
1216 this.rename_entry(entry_id, (new_worktree_id, rel_path).into(), cx),
1217 ))
1218 })?;
1219 Ok(proto::ProjectEntryResponse {
1220 entry: match &task.await? {
1221 CreatedEntry::Included(entry) => Some(entry.into()),
1222 CreatedEntry::Excluded { .. } => None,
1223 },
1224 worktree_scan_id: scan_id as u64,
1225 trashed_entry: None,
1226 })
1227 }
1228
1229 pub async fn handle_expand_project_entry(
1230 this: Entity<Self>,
1231 envelope: TypedEnvelope<proto::ExpandProjectEntry>,
1232 mut cx: AsyncApp,
1233 ) -> Result<proto::ExpandProjectEntryResponse> {
1234 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1235 let worktree = this
1236 .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))
1237 .context("invalid request")?;
1238 Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
1239 }
1240
1241 pub async fn handle_expand_all_for_project_entry(
1242 this: Entity<Self>,
1243 envelope: TypedEnvelope<proto::ExpandAllForProjectEntry>,
1244 mut cx: AsyncApp,
1245 ) -> Result<proto::ExpandAllForProjectEntryResponse> {
1246 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1247 let worktree = this
1248 .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))
1249 .context("invalid request")?;
1250 Worktree::handle_expand_all_for_entry(worktree, envelope.payload, cx).await
1251 }
1252
1253 pub async fn handle_allocate_worktree_id(
1254 _this: Entity<Self>,
1255 _envelope: TypedEnvelope<proto::AllocateWorktreeId>,
1256 cx: AsyncApp,
1257 ) -> Result<proto::AllocateWorktreeIdResponse> {
1258 let worktree_id = cx.update(|cx| WorktreeIdCounter::get(cx).next());
1259 Ok(proto::AllocateWorktreeIdResponse { worktree_id })
1260 }
1261
1262 pub fn fs(&self) -> Option<Arc<dyn Fs>> {
1263 match &self.state {
1264 WorktreeStoreState::Local { fs } => Some(fs.clone()),
1265 WorktreeStoreState::Remote { .. } => None,
1266 }
1267 }
1268}
1269
1270#[derive(Clone, Debug)]
1271enum WorktreeHandle {
1272 Strong(Entity<Worktree>),
1273 Weak(WeakEntity<Worktree>),
1274}
1275
1276impl WorktreeHandle {
1277 fn upgrade(&self) -> Option<Entity<Worktree>> {
1278 match self {
1279 WorktreeHandle::Strong(handle) => Some(handle.clone()),
1280 WorktreeHandle::Weak(handle) => handle.upgrade(),
1281 }
1282 }
1283}