worktree_store.rs

   1use std::{
   2    future::Future,
   3    path::{Path, PathBuf},
   4    sync::{
   5        Arc,
   6        atomic::{AtomicU64, AtomicUsize},
   7    },
   8};
   9
  10use anyhow::{Context as _, Result, anyhow, bail};
  11use collections::HashMap;
  12use fs::{Fs, copy_recursive};
  13use futures::{FutureExt, future::Shared};
  14use gpui::{
  15    App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Global, Task,
  16    WeakEntity,
  17};
  18use itertools::Either;
  19use postage::{prelude::Stream as _, watch};
  20use rpc::{
  21    AnyProtoClient, ErrorExt, TypedEnvelope,
  22    proto::{self, REMOTE_SERVER_PROJECT_ID},
  23};
  24use text::ReplicaId;
  25use util::{
  26    ResultExt,
  27    paths::{PathStyle, RemotePathBuf, SanitizedPath},
  28    rel_path::RelPath,
  29};
  30use worktree::{
  31    CreatedEntry, Entry, ProjectEntryId, UpdatedEntriesSet, UpdatedGitRepositoriesSet, Worktree,
  32    WorktreeId,
  33};
  34
  35use crate::{ProjectPath, trusted_worktrees::TrustedWorktrees};
  36
  37enum WorktreeStoreState {
  38    Local {
  39        fs: Arc<dyn Fs>,
  40    },
  41    Remote {
  42        upstream_client: AnyProtoClient,
  43        upstream_project_id: u64,
  44        path_style: PathStyle,
  45    },
  46}
  47
  48#[derive(Clone)]
  49pub struct WorktreeIdCounter(Arc<AtomicU64>);
  50
  51impl Default for WorktreeIdCounter {
  52    fn default() -> Self {
  53        Self(Arc::new(AtomicU64::new(1)))
  54    }
  55}
  56
  57impl WorktreeIdCounter {
  58    pub fn get(cx: &mut App) -> Self {
  59        cx.default_global::<Self>().clone()
  60    }
  61
  62    fn next(&self) -> u64 {
  63        self.0.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
  64    }
  65}
  66
  67impl Global for WorktreeIdCounter {}
  68
  69pub struct WorktreeStore {
  70    next_entry_id: Arc<AtomicUsize>,
  71    next_worktree_id: WorktreeIdCounter,
  72    downstream_client: Option<(AnyProtoClient, u64)>,
  73    retain_worktrees: bool,
  74    worktrees: Vec<WorktreeHandle>,
  75    worktrees_reordered: bool,
  76    scanning_enabled: bool,
  77    #[allow(clippy::type_complexity)]
  78    loading_worktrees:
  79        HashMap<Arc<SanitizedPath>, Shared<Task<Result<Entity<Worktree>, Arc<anyhow::Error>>>>>,
  80    initial_scan_complete: (watch::Sender<bool>, watch::Receiver<bool>),
  81    state: WorktreeStoreState,
  82}
  83
  84#[derive(Debug)]
  85pub enum WorktreeStoreEvent {
  86    WorktreeAdded(Entity<Worktree>),
  87    WorktreeRemoved(EntityId, WorktreeId),
  88    WorktreeReleased(EntityId, WorktreeId),
  89    WorktreeOrderChanged,
  90    WorktreeUpdateSent(Entity<Worktree>),
  91    WorktreeUpdatedEntries(WorktreeId, UpdatedEntriesSet),
  92    WorktreeUpdatedGitRepositories(WorktreeId, UpdatedGitRepositoriesSet),
  93    WorktreeDeletedEntry(WorktreeId, ProjectEntryId),
  94}
  95
  96impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
  97
  98impl WorktreeStore {
  99    pub fn init(client: &AnyProtoClient) {
 100        client.add_entity_request_handler(Self::handle_create_project_entry);
 101        client.add_entity_request_handler(Self::handle_copy_project_entry);
 102        client.add_entity_request_handler(Self::handle_delete_project_entry);
 103        client.add_entity_request_handler(Self::handle_expand_project_entry);
 104        client.add_entity_request_handler(Self::handle_expand_all_for_project_entry);
 105    }
 106
 107    pub fn init_remote(client: &AnyProtoClient) {
 108        client.add_entity_request_handler(Self::handle_allocate_worktree_id);
 109    }
 110
 111    pub fn local(
 112        retain_worktrees: bool,
 113        fs: Arc<dyn Fs>,
 114        next_worktree_id: WorktreeIdCounter,
 115    ) -> Self {
 116        Self {
 117            next_entry_id: Default::default(),
 118            next_worktree_id,
 119            loading_worktrees: Default::default(),
 120            downstream_client: None,
 121            worktrees: Vec::new(),
 122            worktrees_reordered: false,
 123            scanning_enabled: true,
 124            retain_worktrees,
 125            initial_scan_complete: watch::channel_with(true),
 126            state: WorktreeStoreState::Local { fs },
 127        }
 128    }
 129
 130    pub fn remote(
 131        retain_worktrees: bool,
 132        upstream_client: AnyProtoClient,
 133        upstream_project_id: u64,
 134        path_style: PathStyle,
 135        next_worktree_id: WorktreeIdCounter,
 136    ) -> Self {
 137        Self {
 138            next_entry_id: Default::default(),
 139            next_worktree_id,
 140            loading_worktrees: Default::default(),
 141            downstream_client: None,
 142            worktrees: Vec::new(),
 143            worktrees_reordered: false,
 144            scanning_enabled: true,
 145            retain_worktrees,
 146            initial_scan_complete: watch::channel_with(true),
 147            state: WorktreeStoreState::Remote {
 148                upstream_client,
 149                upstream_project_id,
 150                path_style,
 151            },
 152        }
 153    }
 154
 155    pub fn next_worktree_id(&self) -> impl Future<Output = Result<WorktreeId>> + use<> {
 156        let strategy = match (&self.state, &self.downstream_client) {
 157            // we are a remote server, the client is in charge of assigning worktree ids
 158            (WorktreeStoreState::Local { .. }, Some((client, REMOTE_SERVER_PROJECT_ID))) => {
 159                Either::Left(client.clone())
 160            }
 161            // we are just a local zed project, we can assign ids
 162            (WorktreeStoreState::Local { .. }, _) => Either::Right(self.next_worktree_id.next()),
 163            // we are connected to a remote server, we are in charge of assigning worktree ids
 164            (WorktreeStoreState::Remote { .. }, _) => Either::Right(self.next_worktree_id.next()),
 165        };
 166        async move {
 167            match strategy {
 168                Either::Left(client) => Ok(client
 169                    .request(proto::AllocateWorktreeId {
 170                        project_id: REMOTE_SERVER_PROJECT_ID,
 171                    })
 172                    .await?
 173                    .worktree_id),
 174                Either::Right(id) => Ok(id),
 175            }
 176            .map(WorktreeId::from_proto)
 177        }
 178    }
 179
 180    pub fn disable_scanner(&mut self) {
 181        self.scanning_enabled = false;
 182        *self.initial_scan_complete.0.borrow_mut() = true;
 183    }
 184
 185    /// Returns a future that resolves when all visible worktrees have completed
 186    /// their initial scan (entries populated, git repos detected).
 187    pub fn wait_for_initial_scan(&self) -> impl Future<Output = ()> + use<> {
 188        let mut rx = self.initial_scan_complete.1.clone();
 189        async move {
 190            let mut done = *rx.borrow();
 191            while !done {
 192                if let Some(value) = rx.recv().await {
 193                    done = value;
 194                } else {
 195                    break;
 196                }
 197            }
 198        }
 199    }
 200
 201    /// Returns whether all visible worktrees have completed their initial scan.
 202    pub fn initial_scan_completed(&self) -> bool {
 203        *self.initial_scan_complete.1.borrow()
 204    }
 205
 206    /// Checks whether all visible worktrees have completed their initial scan
 207    /// and no worktree creations are pending, and updates the watch channel accordingly.
 208    fn update_initial_scan_state(&mut self, cx: &App) {
 209        let complete = self.loading_worktrees.is_empty()
 210            && self
 211                .visible_worktrees(cx)
 212                .all(|wt| wt.read(cx).completed_scan_id() >= 1);
 213        *self.initial_scan_complete.0.borrow_mut() = complete;
 214    }
 215
 216    /// Spawns a detached task that waits for a worktree's initial scan to complete,
 217    /// then rechecks and updates the aggregate initial scan state.
 218    fn observe_worktree_scan_completion(
 219        &mut self,
 220        worktree: &Entity<Worktree>,
 221        cx: &mut Context<Self>,
 222    ) {
 223        let await_scan = worktree.update(cx, |worktree, _cx| worktree.wait_for_snapshot(1));
 224        cx.spawn(async move |this, cx| {
 225            await_scan.await.ok();
 226            this.update(cx, |this, cx| {
 227                this.update_initial_scan_state(cx);
 228            })
 229            .ok();
 230            anyhow::Ok(())
 231        })
 232        .detach();
 233    }
 234
 235    /// Iterates through all worktrees, including ones that don't appear in the project panel
 236    pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Entity<Worktree>> {
 237        self.worktrees
 238            .iter()
 239            .filter_map(move |worktree| worktree.upgrade())
 240    }
 241
 242    /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
 243    pub fn visible_worktrees<'a>(
 244        &'a self,
 245        cx: &'a App,
 246    ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
 247        self.worktrees()
 248            .filter(|worktree| worktree.read(cx).is_visible())
 249    }
 250
 251    /// Iterates through all user-visible worktrees (directories and files that appear in the project panel) and other, invisible single files that could appear e.g. due to drag and drop.
 252    pub fn visible_worktrees_and_single_files<'a>(
 253        &'a self,
 254        cx: &'a App,
 255    ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
 256        self.worktrees()
 257            .filter(|worktree| worktree.read(cx).is_visible() || worktree.read(cx).is_single_file())
 258    }
 259
 260    pub fn worktree_for_id(&self, id: WorktreeId, cx: &App) -> Option<Entity<Worktree>> {
 261        self.worktrees()
 262            .find(|worktree| worktree.read(cx).id() == id)
 263    }
 264
 265    pub fn worktree_for_entry(
 266        &self,
 267        entry_id: ProjectEntryId,
 268        cx: &App,
 269    ) -> Option<Entity<Worktree>> {
 270        self.worktrees()
 271            .find(|worktree| worktree.read(cx).contains_entry(entry_id))
 272    }
 273
 274    pub fn find_worktree(
 275        &self,
 276        abs_path: impl AsRef<Path>,
 277        cx: &App,
 278    ) -> Option<(Entity<Worktree>, Arc<RelPath>)> {
 279        let abs_path = SanitizedPath::new(abs_path.as_ref());
 280        for tree in self.worktrees() {
 281            let path_style = tree.read(cx).path_style();
 282            if let Some(relative_path) =
 283                path_style.strip_prefix(abs_path.as_ref(), tree.read(cx).abs_path().as_ref())
 284            {
 285                return Some((tree.clone(), relative_path.into_arc()));
 286            }
 287        }
 288        None
 289    }
 290
 291    pub fn project_path_for_absolute_path(&self, abs_path: &Path, cx: &App) -> Option<ProjectPath> {
 292        self.find_worktree(abs_path, cx)
 293            .map(|(worktree, relative_path)| ProjectPath {
 294                worktree_id: worktree.read(cx).id(),
 295                path: relative_path,
 296            })
 297    }
 298
 299    pub fn absolutize(&self, project_path: &ProjectPath, cx: &App) -> Option<PathBuf> {
 300        let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
 301        Some(worktree.read(cx).absolutize(&project_path.path))
 302    }
 303
 304    pub fn path_style(&self) -> PathStyle {
 305        match &self.state {
 306            WorktreeStoreState::Local { .. } => PathStyle::local(),
 307            WorktreeStoreState::Remote { path_style, .. } => *path_style,
 308        }
 309    }
 310
 311    pub fn find_or_create_worktree(
 312        &mut self,
 313        abs_path: impl AsRef<Path>,
 314        visible: bool,
 315        cx: &mut Context<Self>,
 316    ) -> Task<Result<(Entity<Worktree>, Arc<RelPath>)>> {
 317        let abs_path = abs_path.as_ref();
 318        if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
 319            Task::ready(Ok((tree, relative_path)))
 320        } else {
 321            let worktree = self.create_worktree(abs_path, visible, cx);
 322            cx.background_spawn(async move { Ok((worktree.await?, RelPath::empty().into())) })
 323        }
 324    }
 325
 326    pub fn entry_for_id<'a>(&'a self, entry_id: ProjectEntryId, cx: &'a App) -> Option<&'a Entry> {
 327        self.worktrees()
 328            .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
 329    }
 330
 331    pub fn worktree_and_entry_for_id<'a>(
 332        &'a self,
 333        entry_id: ProjectEntryId,
 334        cx: &'a App,
 335    ) -> Option<(Entity<Worktree>, &'a Entry)> {
 336        self.worktrees().find_map(|worktree| {
 337            worktree
 338                .read(cx)
 339                .entry_for_id(entry_id)
 340                .map(|e| (worktree.clone(), e))
 341        })
 342    }
 343
 344    pub fn entry_for_path<'a>(&'a self, path: &ProjectPath, cx: &'a App) -> Option<&'a Entry> {
 345        self.worktree_for_id(path.worktree_id, cx)?
 346            .read(cx)
 347            .entry_for_path(&path.path)
 348    }
 349
 350    pub fn copy_entry(
 351        &mut self,
 352        entry_id: ProjectEntryId,
 353        new_project_path: ProjectPath,
 354        cx: &mut Context<Self>,
 355    ) -> Task<Result<Option<Entry>>> {
 356        let Some(old_worktree) = self.worktree_for_entry(entry_id, cx) else {
 357            return Task::ready(Err(anyhow!("no such worktree")));
 358        };
 359        let Some(old_entry) = old_worktree.read(cx).entry_for_id(entry_id) else {
 360            return Task::ready(Err(anyhow!("no such entry")));
 361        };
 362        let Some(new_worktree) = self.worktree_for_id(new_project_path.worktree_id, cx) else {
 363            return Task::ready(Err(anyhow!("no such worktree")));
 364        };
 365
 366        match &self.state {
 367            WorktreeStoreState::Local { fs } => {
 368                let old_abs_path = old_worktree.read(cx).absolutize(&old_entry.path);
 369                let new_abs_path = new_worktree.read(cx).absolutize(&new_project_path.path);
 370                let fs = fs.clone();
 371                let copy = cx.background_spawn(async move {
 372                    copy_recursive(
 373                        fs.as_ref(),
 374                        &old_abs_path,
 375                        &new_abs_path,
 376                        Default::default(),
 377                    )
 378                    .await
 379                });
 380
 381                cx.spawn(async move |_, cx| {
 382                    copy.await?;
 383                    new_worktree
 384                        .update(cx, |this, cx| {
 385                            this.as_local_mut().unwrap().refresh_entry(
 386                                new_project_path.path,
 387                                None,
 388                                cx,
 389                            )
 390                        })
 391                        .await
 392                })
 393            }
 394            WorktreeStoreState::Remote {
 395                upstream_client,
 396                upstream_project_id,
 397                ..
 398            } => {
 399                let response = upstream_client.request(proto::CopyProjectEntry {
 400                    project_id: *upstream_project_id,
 401                    entry_id: entry_id.to_proto(),
 402                    new_path: new_project_path.path.to_proto(),
 403                    new_worktree_id: new_project_path.worktree_id.to_proto(),
 404                });
 405                cx.spawn(async move |_, cx| {
 406                    let response = response.await?;
 407                    match response.entry {
 408                        Some(entry) => new_worktree
 409                            .update(cx, |worktree, cx| {
 410                                worktree.as_remote_mut().unwrap().insert_entry(
 411                                    entry,
 412                                    response.worktree_scan_id as usize,
 413                                    cx,
 414                                )
 415                            })
 416                            .await
 417                            .map(Some),
 418                        None => Ok(None),
 419                    }
 420                })
 421            }
 422        }
 423    }
 424
 425    pub fn rename_entry(
 426        &mut self,
 427        entry_id: ProjectEntryId,
 428        new_project_path: ProjectPath,
 429        cx: &mut Context<Self>,
 430    ) -> Task<Result<CreatedEntry>> {
 431        let Some(old_worktree) = self.worktree_for_entry(entry_id, cx) else {
 432            return Task::ready(Err(anyhow!("no such worktree")));
 433        };
 434        let Some(old_entry) = old_worktree.read(cx).entry_for_id(entry_id).cloned() else {
 435            return Task::ready(Err(anyhow!("no such entry")));
 436        };
 437        let Some(new_worktree) = self.worktree_for_id(new_project_path.worktree_id, cx) else {
 438            return Task::ready(Err(anyhow!("no such worktree")));
 439        };
 440
 441        match &self.state {
 442            WorktreeStoreState::Local { fs } => {
 443                let abs_old_path = old_worktree.read(cx).absolutize(&old_entry.path);
 444                let new_worktree_ref = new_worktree.read(cx);
 445                let is_root_entry = new_worktree_ref
 446                    .root_entry()
 447                    .is_some_and(|e| e.id == entry_id);
 448                let abs_new_path = if is_root_entry {
 449                    let abs_path = new_worktree_ref.abs_path();
 450                    let Some(root_parent_path) = abs_path.parent() else {
 451                        return Task::ready(Err(anyhow!("no parent for path {:?}", abs_path)));
 452                    };
 453                    root_parent_path.join(new_project_path.path.as_std_path())
 454                } else {
 455                    new_worktree_ref.absolutize(&new_project_path.path)
 456                };
 457
 458                let fs = fs.clone();
 459                let case_sensitive = new_worktree
 460                    .read(cx)
 461                    .as_local()
 462                    .unwrap()
 463                    .fs_is_case_sensitive();
 464
 465                let do_rename =
 466                    async move |fs: &dyn Fs, old_path: &Path, new_path: &Path, overwrite| {
 467                        fs.rename(
 468                            &old_path,
 469                            &new_path,
 470                            fs::RenameOptions {
 471                                overwrite,
 472                                ..fs::RenameOptions::default()
 473                            },
 474                        )
 475                        .await
 476                        .with_context(|| format!("renaming {old_path:?} into {new_path:?}"))
 477                    };
 478
 479                let rename = cx.background_spawn({
 480                    let abs_new_path = abs_new_path.clone();
 481                    async move {
 482                        // If we're on a case-insensitive FS and we're doing a case-only rename (i.e. `foobar` to `FOOBAR`)
 483                        // we want to overwrite, because otherwise we run into a file-already-exists error.
 484                        let overwrite = !case_sensitive
 485                            && abs_old_path != abs_new_path
 486                            && abs_old_path.to_str().map(|p| p.to_lowercase())
 487                                == abs_new_path.to_str().map(|p| p.to_lowercase());
 488
 489                        // The directory we're renaming into might not exist yet
 490                        if let Err(e) =
 491                            do_rename(fs.as_ref(), &abs_old_path, &abs_new_path, overwrite).await
 492                        {
 493                            if let Some(err) = e.downcast_ref::<std::io::Error>()
 494                                && err.kind() == std::io::ErrorKind::NotFound
 495                            {
 496                                if let Some(parent) = abs_new_path.parent() {
 497                                    fs.create_dir(parent).await.with_context(|| {
 498                                        format!("creating parent directory {parent:?}")
 499                                    })?;
 500                                    return do_rename(
 501                                        fs.as_ref(),
 502                                        &abs_old_path,
 503                                        &abs_new_path,
 504                                        overwrite,
 505                                    )
 506                                    .await;
 507                                }
 508                            }
 509                            return Err(e);
 510                        }
 511                        Ok(())
 512                    }
 513                });
 514
 515                cx.spawn(async move |_, cx| {
 516                    rename.await?;
 517                    Ok(new_worktree
 518                        .update(cx, |this, cx| {
 519                            let local = this.as_local_mut().unwrap();
 520                            if is_root_entry {
 521                                // We eagerly update `abs_path` and refresh this worktree.
 522                                // Otherwise, the FS watcher would do it on the `RootUpdated` event,
 523                                // but with a noticeable delay, so we handle it proactively.
 524                                local.update_abs_path_and_refresh(
 525                                    SanitizedPath::new_arc(&abs_new_path),
 526                                    cx,
 527                                );
 528                                Task::ready(Ok(this.root_entry().cloned()))
 529                            } else {
 530                                // First refresh the parent directory (in case it was newly created)
 531                                if let Some(parent) = new_project_path.path.parent() {
 532                                    let _ = local.refresh_entries_for_paths(vec![parent.into()]);
 533                                }
 534                                // Then refresh the new path
 535                                local.refresh_entry(
 536                                    new_project_path.path.clone(),
 537                                    Some(old_entry.path),
 538                                    cx,
 539                                )
 540                            }
 541                        })
 542                        .await?
 543                        .map(CreatedEntry::Included)
 544                        .unwrap_or_else(|| CreatedEntry::Excluded {
 545                            abs_path: abs_new_path,
 546                        }))
 547                })
 548            }
 549            WorktreeStoreState::Remote {
 550                upstream_client,
 551                upstream_project_id,
 552                ..
 553            } => {
 554                let response = upstream_client.request(proto::RenameProjectEntry {
 555                    project_id: *upstream_project_id,
 556                    entry_id: entry_id.to_proto(),
 557                    new_path: new_project_path.path.to_proto(),
 558                    new_worktree_id: new_project_path.worktree_id.to_proto(),
 559                });
 560                cx.spawn(async move |_, cx| {
 561                    let response = response.await?;
 562                    match response.entry {
 563                        Some(entry) => new_worktree
 564                            .update(cx, |worktree, cx| {
 565                                worktree.as_remote_mut().unwrap().insert_entry(
 566                                    entry,
 567                                    response.worktree_scan_id as usize,
 568                                    cx,
 569                                )
 570                            })
 571                            .await
 572                            .map(CreatedEntry::Included),
 573                        None => {
 574                            let abs_path = new_worktree.read_with(cx, |worktree, _| {
 575                                worktree.absolutize(&new_project_path.path)
 576                            });
 577                            Ok(CreatedEntry::Excluded { abs_path })
 578                        }
 579                    }
 580                })
 581            }
 582        }
 583    }
 584    pub fn create_worktree(
 585        &mut self,
 586        abs_path: impl AsRef<Path>,
 587        visible: bool,
 588        cx: &mut Context<Self>,
 589    ) -> Task<Result<Entity<Worktree>>> {
 590        let abs_path: Arc<SanitizedPath> = SanitizedPath::new_arc(&abs_path);
 591        let is_via_collab = matches!(&self.state, WorktreeStoreState::Remote { upstream_client, .. } if upstream_client.is_via_collab());
 592        if !self.loading_worktrees.contains_key(&abs_path) {
 593            let task = match &self.state {
 594                WorktreeStoreState::Remote {
 595                    upstream_client,
 596                    path_style,
 597                    ..
 598                } => {
 599                    if upstream_client.is_via_collab() {
 600                        Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
 601                    } else {
 602                        let abs_path = RemotePathBuf::new(abs_path.to_string(), *path_style);
 603                        self.create_remote_worktree(upstream_client.clone(), abs_path, visible, cx)
 604                    }
 605                }
 606                WorktreeStoreState::Local { fs } => {
 607                    self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
 608                }
 609            };
 610
 611            self.loading_worktrees
 612                .insert(abs_path.clone(), task.shared());
 613
 614            if visible && self.scanning_enabled {
 615                *self.initial_scan_complete.0.borrow_mut() = false;
 616            }
 617        }
 618        let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
 619        cx.spawn(async move |this, cx| {
 620            let result = task.await;
 621            this.update(cx, |this, cx| {
 622                this.loading_worktrees.remove(&abs_path);
 623                if !visible || !this.scanning_enabled || result.is_err() {
 624                    this.update_initial_scan_state(cx);
 625                }
 626            })
 627            .ok();
 628
 629            match result {
 630                Ok(worktree) => {
 631                    if !is_via_collab {
 632                        if let Some((trusted_worktrees, worktree_store)) = this
 633                            .update(cx, |_, cx| {
 634                                TrustedWorktrees::try_get_global(cx).zip(Some(cx.entity()))
 635                            })
 636                            .ok()
 637                            .flatten()
 638                        {
 639                            trusted_worktrees.update(cx, |trusted_worktrees, cx| {
 640                                trusted_worktrees.can_trust(
 641                                    &worktree_store,
 642                                    worktree.read(cx).id(),
 643                                    cx,
 644                                );
 645                            });
 646                        }
 647
 648                        this.update(cx, |this, cx| {
 649                            if this.scanning_enabled && visible {
 650                                this.observe_worktree_scan_completion(&worktree, cx);
 651                            }
 652                        })
 653                        .ok();
 654                    }
 655                    Ok(worktree)
 656                }
 657                Err(err) => Err((*err).cloned()),
 658            }
 659        })
 660    }
 661
 662    fn create_remote_worktree(
 663        &mut self,
 664        client: AnyProtoClient,
 665        abs_path: RemotePathBuf,
 666        visible: bool,
 667        cx: &mut Context<Self>,
 668    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
 669        let path_style = abs_path.path_style();
 670        let mut abs_path = abs_path.to_string();
 671        // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
 672        // in which case want to strip the leading the `/`.
 673        // On the host-side, the `~` will get expanded.
 674        // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
 675        if abs_path.starts_with("/~") {
 676            abs_path = abs_path[1..].to_string();
 677        }
 678        if abs_path.is_empty() {
 679            abs_path = "~/".to_string();
 680        }
 681
 682        cx.spawn(async move |this, cx| {
 683            let this = this.upgrade().context("Dropped worktree store")?;
 684
 685            let path = RemotePathBuf::new(abs_path, path_style);
 686            let response = client
 687                .request(proto::AddWorktree {
 688                    project_id: REMOTE_SERVER_PROJECT_ID,
 689                    path: path.to_proto(),
 690                    visible,
 691                })
 692                .await?;
 693
 694            if let Some(existing_worktree) = this.read_with(cx, |this, cx| {
 695                this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
 696            }) {
 697                return Ok(existing_worktree);
 698            }
 699
 700            let root_path_buf = PathBuf::from(response.canonicalized_path.clone());
 701            let root_name = root_path_buf
 702                .file_name()
 703                .map(|n| n.to_string_lossy().into_owned())
 704                .unwrap_or(root_path_buf.to_string_lossy().into_owned());
 705
 706            let worktree = cx.update(|cx| {
 707                Worktree::remote(
 708                    REMOTE_SERVER_PROJECT_ID,
 709                    ReplicaId::REMOTE_SERVER,
 710                    proto::WorktreeMetadata {
 711                        id: response.worktree_id,
 712                        root_name,
 713                        visible,
 714                        abs_path: response.canonicalized_path,
 715                    },
 716                    client,
 717                    path_style,
 718                    cx,
 719                )
 720            });
 721
 722            this.update(cx, |this, cx| {
 723                this.add(&worktree, cx);
 724            });
 725            Ok(worktree)
 726        })
 727    }
 728
 729    fn create_local_worktree(
 730        &mut self,
 731        fs: Arc<dyn Fs>,
 732        abs_path: Arc<SanitizedPath>,
 733        visible: bool,
 734        cx: &mut Context<Self>,
 735    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
 736        let next_entry_id = self.next_entry_id.clone();
 737        let scanning_enabled = self.scanning_enabled;
 738
 739        let next_worktree_id = self.next_worktree_id();
 740
 741        cx.spawn(async move |this, cx| {
 742            let worktree_id = next_worktree_id.await?;
 743            let worktree = Worktree::local(
 744                SanitizedPath::cast_arc(abs_path.clone()),
 745                visible,
 746                fs,
 747                next_entry_id,
 748                scanning_enabled,
 749                worktree_id,
 750                cx,
 751            )
 752            .await?;
 753
 754            this.update(cx, |this, cx| this.add(&worktree, cx))?;
 755
 756            if visible {
 757                cx.update(|cx| {
 758                    cx.add_recent_document(abs_path.as_path());
 759                });
 760            }
 761
 762            Ok(worktree)
 763        })
 764    }
 765
 766    pub fn add(&mut self, worktree: &Entity<Worktree>, cx: &mut Context<Self>) {
 767        let worktree_id = worktree.read(cx).id();
 768        debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
 769
 770        let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
 771        let handle = if push_strong_handle {
 772            WorktreeHandle::Strong(worktree.clone())
 773        } else {
 774            WorktreeHandle::Weak(worktree.downgrade())
 775        };
 776        if self.worktrees_reordered {
 777            self.worktrees.push(handle);
 778        } else {
 779            let i = match self
 780                .worktrees
 781                .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
 782                    other.upgrade().map(|worktree| worktree.read(cx).abs_path())
 783                }) {
 784                Ok(i) | Err(i) => i,
 785            };
 786            self.worktrees.insert(i, handle);
 787        }
 788
 789        cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
 790        self.send_project_updates(cx);
 791
 792        let handle_id = worktree.entity_id();
 793        cx.subscribe(worktree, |_, worktree, event, cx| {
 794            let worktree_id = worktree.read(cx).id();
 795            match event {
 796                worktree::Event::UpdatedEntries(changes) => {
 797                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedEntries(
 798                        worktree_id,
 799                        changes.clone(),
 800                    ));
 801                }
 802                worktree::Event::UpdatedGitRepositories(set) => {
 803                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedGitRepositories(
 804                        worktree_id,
 805                        set.clone(),
 806                    ));
 807                }
 808                worktree::Event::DeletedEntry(id) => {
 809                    cx.emit(WorktreeStoreEvent::WorktreeDeletedEntry(worktree_id, *id))
 810                }
 811            }
 812        })
 813        .detach();
 814        cx.observe_release(worktree, move |this, worktree, cx| {
 815            cx.emit(WorktreeStoreEvent::WorktreeReleased(
 816                handle_id,
 817                worktree.id(),
 818            ));
 819            cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 820                handle_id,
 821                worktree.id(),
 822            ));
 823            this.send_project_updates(cx);
 824        })
 825        .detach();
 826    }
 827
 828    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut Context<Self>) {
 829        self.worktrees.retain(|worktree| {
 830            if let Some(worktree) = worktree.upgrade() {
 831                if worktree.read(cx).id() == id_to_remove {
 832                    cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 833                        worktree.entity_id(),
 834                        id_to_remove,
 835                    ));
 836                    false
 837                } else {
 838                    true
 839                }
 840            } else {
 841                false
 842            }
 843        });
 844        self.update_initial_scan_state(cx);
 845        self.send_project_updates(cx);
 846    }
 847
 848    pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
 849        self.worktrees_reordered = worktrees_reordered;
 850    }
 851
 852    fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
 853        match &self.state {
 854            WorktreeStoreState::Remote {
 855                upstream_client,
 856                upstream_project_id,
 857                ..
 858            } => Some((upstream_client.clone(), *upstream_project_id)),
 859            WorktreeStoreState::Local { .. } => None,
 860        }
 861    }
 862
 863    pub fn set_worktrees_from_proto(
 864        &mut self,
 865        worktrees: Vec<proto::WorktreeMetadata>,
 866        replica_id: ReplicaId,
 867        cx: &mut Context<Self>,
 868    ) -> Result<()> {
 869        let mut old_worktrees_by_id = self
 870            .worktrees
 871            .drain(..)
 872            .filter_map(|worktree| {
 873                let worktree = worktree.upgrade()?;
 874                Some((worktree.read(cx).id(), worktree))
 875            })
 876            .collect::<HashMap<_, _>>();
 877
 878        let (client, project_id) = self.upstream_client().context("invalid project")?;
 879
 880        for worktree in worktrees {
 881            if let Some(old_worktree) =
 882                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
 883            {
 884                let push_strong_handle =
 885                    self.retain_worktrees || old_worktree.read(cx).is_visible();
 886                let handle = if push_strong_handle {
 887                    WorktreeHandle::Strong(old_worktree.clone())
 888                } else {
 889                    WorktreeHandle::Weak(old_worktree.downgrade())
 890                };
 891                self.worktrees.push(handle);
 892            } else {
 893                self.add(
 894                    &Worktree::remote(
 895                        project_id,
 896                        replica_id,
 897                        worktree,
 898                        client.clone(),
 899                        self.path_style(),
 900                        cx,
 901                    ),
 902                    cx,
 903                );
 904            }
 905        }
 906        self.send_project_updates(cx);
 907
 908        Ok(())
 909    }
 910
 911    pub fn move_worktree(
 912        &mut self,
 913        source: WorktreeId,
 914        destination: WorktreeId,
 915        cx: &mut Context<Self>,
 916    ) -> Result<()> {
 917        if source == destination {
 918            return Ok(());
 919        }
 920
 921        let mut source_index = None;
 922        let mut destination_index = None;
 923        for (i, worktree) in self.worktrees.iter().enumerate() {
 924            if let Some(worktree) = worktree.upgrade() {
 925                let worktree_id = worktree.read(cx).id();
 926                if worktree_id == source {
 927                    source_index = Some(i);
 928                    if destination_index.is_some() {
 929                        break;
 930                    }
 931                } else if worktree_id == destination {
 932                    destination_index = Some(i);
 933                    if source_index.is_some() {
 934                        break;
 935                    }
 936                }
 937            }
 938        }
 939
 940        let source_index =
 941            source_index.with_context(|| format!("Missing worktree for id {source}"))?;
 942        let destination_index =
 943            destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
 944
 945        if source_index == destination_index {
 946            return Ok(());
 947        }
 948
 949        let worktree_to_move = self.worktrees.remove(source_index);
 950        self.worktrees.insert(destination_index, worktree_to_move);
 951        self.worktrees_reordered = true;
 952        cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
 953        cx.notify();
 954        Ok(())
 955    }
 956
 957    pub fn disconnected_from_host(&mut self, cx: &mut App) {
 958        for worktree in &self.worktrees {
 959            if let Some(worktree) = worktree.upgrade() {
 960                worktree.update(cx, |worktree, _| {
 961                    if let Some(worktree) = worktree.as_remote_mut() {
 962                        worktree.disconnected_from_host();
 963                    }
 964                });
 965            }
 966        }
 967    }
 968
 969    pub fn send_project_updates(&mut self, cx: &mut Context<Self>) {
 970        let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
 971            return;
 972        };
 973
 974        let update = proto::UpdateProject {
 975            project_id,
 976            worktrees: self.worktree_metadata_protos(cx),
 977        };
 978
 979        // collab has bad concurrency guarantees, so we send requests in serial.
 980        let update_project = if downstream_client.is_via_collab() {
 981            Some(downstream_client.request(update))
 982        } else {
 983            downstream_client.send(update).log_err();
 984            None
 985        };
 986        cx.spawn(async move |this, cx| {
 987            if let Some(update_project) = update_project {
 988                update_project.await?;
 989            }
 990
 991            this.update(cx, |this, cx| {
 992                let worktrees = this.worktrees().collect::<Vec<_>>();
 993
 994                for worktree in worktrees {
 995                    worktree.update(cx, |worktree, cx| {
 996                        let client = downstream_client.clone();
 997                        worktree.observe_updates(project_id, cx, {
 998                            move |update| {
 999                                let client = client.clone();
1000                                async move {
1001                                    if client.is_via_collab() {
1002                                        client
1003                                            .request(update)
1004                                            .map(|result| result.log_err().is_some())
1005                                            .await
1006                                    } else {
1007                                        client.send(update).log_err().is_some()
1008                                    }
1009                                }
1010                            }
1011                        });
1012                    });
1013
1014                    cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
1015                }
1016
1017                anyhow::Ok(())
1018            })
1019        })
1020        .detach_and_log_err(cx);
1021    }
1022
1023    pub fn worktree_metadata_protos(&self, cx: &App) -> Vec<proto::WorktreeMetadata> {
1024        self.worktrees()
1025            .map(|worktree| {
1026                let worktree = worktree.read(cx);
1027                proto::WorktreeMetadata {
1028                    id: worktree.id().to_proto(),
1029                    root_name: worktree.root_name_str().to_owned(),
1030                    visible: worktree.is_visible(),
1031                    abs_path: worktree.abs_path().to_string_lossy().into_owned(),
1032                }
1033            })
1034            .collect()
1035    }
1036
1037    pub fn shared(
1038        &mut self,
1039        remote_id: u64,
1040        downstream_client: AnyProtoClient,
1041        cx: &mut Context<Self>,
1042    ) {
1043        self.retain_worktrees = true;
1044        self.downstream_client = Some((downstream_client, remote_id));
1045
1046        // When shared, retain all worktrees
1047        for worktree_handle in self.worktrees.iter_mut() {
1048            match worktree_handle {
1049                WorktreeHandle::Strong(_) => {}
1050                WorktreeHandle::Weak(worktree) => {
1051                    if let Some(worktree) = worktree.upgrade() {
1052                        *worktree_handle = WorktreeHandle::Strong(worktree);
1053                    }
1054                }
1055            }
1056        }
1057        // Only send project updates if we share in a collaborative mode.
1058        // Otherwise we are the remote server which is currently constructing
1059        // worktree store before the client actually has set up its message
1060        // handlers.
1061        if remote_id != REMOTE_SERVER_PROJECT_ID {
1062            self.send_project_updates(cx);
1063        }
1064    }
1065
1066    pub fn unshared(&mut self, cx: &mut Context<Self>) {
1067        self.retain_worktrees = false;
1068        self.downstream_client.take();
1069
1070        // When not shared, only retain the visible worktrees
1071        for worktree_handle in self.worktrees.iter_mut() {
1072            if let WorktreeHandle::Strong(worktree) = worktree_handle {
1073                let is_visible = worktree.update(cx, |worktree, _| {
1074                    worktree.stop_observing_updates();
1075                    worktree.is_visible()
1076                });
1077                if !is_visible {
1078                    *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
1079                }
1080            }
1081        }
1082    }
1083
1084    pub async fn handle_create_project_entry(
1085        this: Entity<Self>,
1086        envelope: TypedEnvelope<proto::CreateProjectEntry>,
1087        mut cx: AsyncApp,
1088    ) -> Result<proto::ProjectEntryResponse> {
1089        let worktree = this.update(&mut cx, |this, cx| {
1090            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
1091            this.worktree_for_id(worktree_id, cx)
1092                .context("worktree not found")
1093        })?;
1094        Worktree::handle_create_entry(worktree, envelope.payload, cx).await
1095    }
1096
1097    pub async fn handle_copy_project_entry(
1098        this: Entity<Self>,
1099        envelope: TypedEnvelope<proto::CopyProjectEntry>,
1100        mut cx: AsyncApp,
1101    ) -> Result<proto::ProjectEntryResponse> {
1102        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1103        let new_worktree_id = WorktreeId::from_proto(envelope.payload.new_worktree_id);
1104        let new_project_path = (
1105            new_worktree_id,
1106            RelPath::from_proto(&envelope.payload.new_path)?,
1107        );
1108        let (scan_id, entry) = this.update(&mut cx, |this, cx| {
1109            let Some((_, project_id)) = this.downstream_client else {
1110                bail!("no downstream client")
1111            };
1112            let Some(entry) = this.entry_for_id(entry_id, cx) else {
1113                bail!("no such entry");
1114            };
1115            if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1116                bail!("entry is private")
1117            }
1118
1119            let new_worktree = this
1120                .worktree_for_id(new_worktree_id, cx)
1121                .context("no such worktree")?;
1122            let scan_id = new_worktree.read(cx).scan_id();
1123            anyhow::Ok((
1124                scan_id,
1125                this.copy_entry(entry_id, new_project_path.into(), cx),
1126            ))
1127        })?;
1128        let entry = entry.await?;
1129        Ok(proto::ProjectEntryResponse {
1130            entry: entry.as_ref().map(|entry| entry.into()),
1131            worktree_scan_id: scan_id as u64,
1132        })
1133    }
1134
1135    pub async fn handle_delete_project_entry(
1136        this: Entity<Self>,
1137        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
1138        mut cx: AsyncApp,
1139    ) -> Result<proto::ProjectEntryResponse> {
1140        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1141        let worktree = this.update(&mut cx, |this, cx| {
1142            let Some((_, project_id)) = this.downstream_client else {
1143                bail!("no downstream client")
1144            };
1145            let Some(entry) = this.entry_for_id(entry_id, cx) else {
1146                bail!("no entry")
1147            };
1148            if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1149                bail!("entry is private")
1150            }
1151            this.worktree_for_entry(entry_id, cx)
1152                .context("worktree not found")
1153        })?;
1154        Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
1155    }
1156
1157    pub async fn handle_rename_project_entry(
1158        this: Entity<Self>,
1159        request: proto::RenameProjectEntry,
1160        mut cx: AsyncApp,
1161    ) -> Result<proto::ProjectEntryResponse> {
1162        let entry_id = ProjectEntryId::from_proto(request.entry_id);
1163        let new_worktree_id = WorktreeId::from_proto(request.new_worktree_id);
1164        let rel_path = RelPath::from_proto(&request.new_path)
1165            .with_context(|| format!("received invalid relative path {:?}", &request.new_path))?;
1166
1167        let (scan_id, task) = this.update(&mut cx, |this, cx| {
1168            let worktree = this
1169                .worktree_for_entry(entry_id, cx)
1170                .context("no such worktree")?;
1171
1172            let Some((_, project_id)) = this.downstream_client else {
1173                bail!("no downstream client")
1174            };
1175            let entry = worktree
1176                .read(cx)
1177                .entry_for_id(entry_id)
1178                .ok_or_else(|| anyhow!("missing entry"))?;
1179            if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1180                bail!("entry is private")
1181            }
1182
1183            let scan_id = worktree.read(cx).scan_id();
1184            anyhow::Ok((
1185                scan_id,
1186                this.rename_entry(entry_id, (new_worktree_id, rel_path).into(), cx),
1187            ))
1188        })?;
1189        Ok(proto::ProjectEntryResponse {
1190            entry: match &task.await? {
1191                CreatedEntry::Included(entry) => Some(entry.into()),
1192                CreatedEntry::Excluded { .. } => None,
1193            },
1194            worktree_scan_id: scan_id as u64,
1195        })
1196    }
1197
1198    pub async fn handle_expand_project_entry(
1199        this: Entity<Self>,
1200        envelope: TypedEnvelope<proto::ExpandProjectEntry>,
1201        mut cx: AsyncApp,
1202    ) -> Result<proto::ExpandProjectEntryResponse> {
1203        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1204        let worktree = this
1205            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))
1206            .context("invalid request")?;
1207        Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
1208    }
1209
1210    pub async fn handle_expand_all_for_project_entry(
1211        this: Entity<Self>,
1212        envelope: TypedEnvelope<proto::ExpandAllForProjectEntry>,
1213        mut cx: AsyncApp,
1214    ) -> Result<proto::ExpandAllForProjectEntryResponse> {
1215        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1216        let worktree = this
1217            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))
1218            .context("invalid request")?;
1219        Worktree::handle_expand_all_for_entry(worktree, envelope.payload, cx).await
1220    }
1221
1222    pub async fn handle_allocate_worktree_id(
1223        _this: Entity<Self>,
1224        _envelope: TypedEnvelope<proto::AllocateWorktreeId>,
1225        cx: AsyncApp,
1226    ) -> Result<proto::AllocateWorktreeIdResponse> {
1227        let worktree_id = cx.update(|cx| WorktreeIdCounter::get(cx).next());
1228        Ok(proto::AllocateWorktreeIdResponse { worktree_id })
1229    }
1230
1231    pub fn fs(&self) -> Option<Arc<dyn Fs>> {
1232        match &self.state {
1233            WorktreeStoreState::Local { fs } => Some(fs.clone()),
1234            WorktreeStoreState::Remote { .. } => None,
1235        }
1236    }
1237}
1238
1239#[derive(Clone, Debug)]
1240enum WorktreeHandle {
1241    Strong(Entity<Worktree>),
1242    Weak(WeakEntity<Worktree>),
1243}
1244
1245impl WorktreeHandle {
1246    fn upgrade(&self) -> Option<Entity<Worktree>> {
1247        match self {
1248            WorktreeHandle::Strong(handle) => Some(handle.clone()),
1249            WorktreeHandle::Weak(handle) => handle.upgrade(),
1250        }
1251    }
1252}