worktree_store.rs

   1use std::{
   2    future::Future,
   3    path::{Path, PathBuf},
   4    sync::{
   5        Arc,
   6        atomic::{AtomicU64, AtomicUsize},
   7    },
   8};
   9
  10use anyhow::{Context as _, Result, anyhow, bail};
  11use collections::HashMap;
  12use fs::{Fs, copy_recursive};
  13use futures::{FutureExt, future::Shared};
  14use gpui::{
  15    App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Global, Task,
  16    WeakEntity,
  17};
  18use itertools::Either;
  19use postage::{prelude::Stream as _, watch};
  20use rpc::{
  21    AnyProtoClient, ErrorExt, TypedEnvelope,
  22    proto::{self, REMOTE_SERVER_PROJECT_ID},
  23};
  24use text::ReplicaId;
  25use util::{
  26    ResultExt,
  27    paths::{PathStyle, RemotePathBuf, SanitizedPath},
  28    rel_path::RelPath,
  29};
  30use worktree::{
  31    CreatedEntry, Entry, ProjectEntryId, UpdatedEntriesSet, UpdatedGitRepositoriesSet, Worktree,
  32    WorktreeId,
  33};
  34
  35use crate::{ProjectPath, trusted_worktrees::TrustedWorktrees};
  36
  37enum WorktreeStoreState {
  38    Local {
  39        fs: Arc<dyn Fs>,
  40    },
  41    Remote {
  42        upstream_client: AnyProtoClient,
  43        upstream_project_id: u64,
  44        path_style: PathStyle,
  45    },
  46}
  47
  48#[derive(Clone)]
  49pub struct WorktreeIdCounter(Arc<AtomicU64>);
  50
  51impl Default for WorktreeIdCounter {
  52    fn default() -> Self {
  53        Self(Arc::new(AtomicU64::new(1)))
  54    }
  55}
  56
  57impl WorktreeIdCounter {
  58    pub fn get(cx: &mut App) -> Self {
  59        cx.default_global::<Self>().clone()
  60    }
  61
  62    fn next(&self) -> u64 {
  63        self.0.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
  64    }
  65}
  66
  67impl Global for WorktreeIdCounter {}
  68
  69pub struct WorktreeStore {
  70    next_entry_id: Arc<AtomicUsize>,
  71    next_worktree_id: WorktreeIdCounter,
  72    downstream_client: Option<(AnyProtoClient, u64)>,
  73    retain_worktrees: bool,
  74    worktrees: Vec<WorktreeHandle>,
  75    worktrees_reordered: bool,
  76    scanning_enabled: bool,
  77    #[allow(clippy::type_complexity)]
  78    loading_worktrees:
  79        HashMap<Arc<SanitizedPath>, Shared<Task<Result<Entity<Worktree>, Arc<anyhow::Error>>>>>,
  80    initial_scan_complete: (watch::Sender<bool>, watch::Receiver<bool>),
  81    state: WorktreeStoreState,
  82}
  83
  84#[derive(Debug)]
  85pub enum WorktreeStoreEvent {
  86    WorktreeAdded(Entity<Worktree>),
  87    WorktreeRemoved(EntityId, WorktreeId),
  88    WorktreeReleased(EntityId, WorktreeId),
  89    WorktreeOrderChanged,
  90    WorktreeUpdateSent(Entity<Worktree>),
  91    WorktreeUpdatedEntries(WorktreeId, UpdatedEntriesSet),
  92    WorktreeUpdatedGitRepositories(WorktreeId, UpdatedGitRepositoriesSet),
  93    WorktreeDeletedEntry(WorktreeId, ProjectEntryId),
  94}
  95
  96impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
  97
  98impl WorktreeStore {
  99    pub fn init(client: &AnyProtoClient) {
 100        client.add_entity_request_handler(Self::handle_create_project_entry);
 101        client.add_entity_request_handler(Self::handle_copy_project_entry);
 102        client.add_entity_request_handler(Self::handle_delete_project_entry);
 103        client.add_entity_request_handler(Self::handle_expand_project_entry);
 104        client.add_entity_request_handler(Self::handle_expand_all_for_project_entry);
 105    }
 106
 107    pub fn init_remote(client: &AnyProtoClient) {
 108        client.add_entity_request_handler(Self::handle_allocate_worktree_id);
 109    }
 110
 111    pub fn local(
 112        retain_worktrees: bool,
 113        fs: Arc<dyn Fs>,
 114        next_worktree_id: WorktreeIdCounter,
 115    ) -> Self {
 116        Self {
 117            next_entry_id: Default::default(),
 118            next_worktree_id,
 119            loading_worktrees: Default::default(),
 120            downstream_client: None,
 121            worktrees: Vec::new(),
 122            worktrees_reordered: false,
 123            scanning_enabled: true,
 124            retain_worktrees,
 125            initial_scan_complete: watch::channel_with(true),
 126            state: WorktreeStoreState::Local { fs },
 127        }
 128    }
 129
 130    pub fn remote(
 131        retain_worktrees: bool,
 132        upstream_client: AnyProtoClient,
 133        upstream_project_id: u64,
 134        path_style: PathStyle,
 135        next_worktree_id: WorktreeIdCounter,
 136    ) -> Self {
 137        Self {
 138            next_entry_id: Default::default(),
 139            next_worktree_id,
 140            loading_worktrees: Default::default(),
 141            downstream_client: None,
 142            worktrees: Vec::new(),
 143            worktrees_reordered: false,
 144            scanning_enabled: true,
 145            retain_worktrees,
 146            initial_scan_complete: watch::channel_with(true),
 147            state: WorktreeStoreState::Remote {
 148                upstream_client,
 149                upstream_project_id,
 150                path_style,
 151            },
 152        }
 153    }
 154
 155    pub fn next_worktree_id(&self) -> impl Future<Output = Result<WorktreeId>> + use<> {
 156        let strategy = match (&self.state, &self.downstream_client) {
 157            // we are a remote server, the client is in charge of assigning worktree ids
 158            (WorktreeStoreState::Local { .. }, Some((client, REMOTE_SERVER_PROJECT_ID))) => {
 159                Either::Left(client.clone())
 160            }
 161            // we are just a local zed project, we can assign ids
 162            (WorktreeStoreState::Local { .. }, _) => Either::Right(self.next_worktree_id.next()),
 163            // we are connected to a remote server, we are in charge of assigning worktree ids
 164            (WorktreeStoreState::Remote { .. }, _) => Either::Right(self.next_worktree_id.next()),
 165        };
 166        async move {
 167            match strategy {
 168                Either::Left(client) => Ok(client
 169                    .request(proto::AllocateWorktreeId {
 170                        project_id: REMOTE_SERVER_PROJECT_ID,
 171                    })
 172                    .await?
 173                    .worktree_id),
 174                Either::Right(id) => Ok(id),
 175            }
 176            .map(WorktreeId::from_proto)
 177        }
 178    }
 179
 180    pub fn disable_scanner(&mut self) {
 181        self.scanning_enabled = false;
 182        *self.initial_scan_complete.0.borrow_mut() = true;
 183    }
 184
 185    /// Returns a future that resolves when all visible worktrees have completed
 186    /// their initial scan (entries populated, git repos detected).
 187    pub fn wait_for_initial_scan(&self) -> impl Future<Output = ()> + use<> {
 188        let mut rx = self.initial_scan_complete.1.clone();
 189        async move {
 190            let mut done = *rx.borrow();
 191            while !done {
 192                if let Some(value) = rx.recv().await {
 193                    done = value;
 194                } else {
 195                    break;
 196                }
 197            }
 198        }
 199    }
 200
 201    /// Returns whether all visible worktrees have completed their initial scan.
 202    pub fn initial_scan_completed(&self) -> bool {
 203        *self.initial_scan_complete.1.borrow()
 204    }
 205
 206    /// Checks whether all visible worktrees have completed their initial scan
 207    /// and no worktree creations are pending, and updates the watch channel accordingly.
 208    fn update_initial_scan_state(&mut self, cx: &App) {
 209        let complete = self.loading_worktrees.is_empty()
 210            && self
 211                .visible_worktrees(cx)
 212                .all(|wt| wt.read(cx).completed_scan_id() >= 1);
 213        *self.initial_scan_complete.0.borrow_mut() = complete;
 214    }
 215
 216    /// Spawns a detached task that waits for a worktree's initial scan to complete,
 217    /// then rechecks and updates the aggregate initial scan state.
 218    fn observe_worktree_scan_completion(
 219        &mut self,
 220        worktree: &Entity<Worktree>,
 221        cx: &mut Context<Self>,
 222    ) {
 223        let await_scan = worktree.update(cx, |worktree, _cx| worktree.wait_for_snapshot(1));
 224        cx.spawn(async move |this, cx| {
 225            await_scan.await.ok();
 226            this.update(cx, |this, cx| {
 227                this.update_initial_scan_state(cx);
 228            })
 229            .ok();
 230            anyhow::Ok(())
 231        })
 232        .detach();
 233    }
 234
 235    /// Iterates through all worktrees, including ones that don't appear in the project panel
 236    pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Entity<Worktree>> {
 237        self.worktrees
 238            .iter()
 239            .filter_map(move |worktree| worktree.upgrade())
 240    }
 241
 242    /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
 243    pub fn visible_worktrees<'a>(
 244        &'a self,
 245        cx: &'a App,
 246    ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
 247        self.worktrees()
 248            .filter(|worktree| worktree.read(cx).is_visible())
 249    }
 250
 251    /// Iterates through all user-visible worktrees (directories and files that appear in the project panel) and other, invisible single files that could appear e.g. due to drag and drop.
 252    pub fn visible_worktrees_and_single_files<'a>(
 253        &'a self,
 254        cx: &'a App,
 255    ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
 256        self.worktrees()
 257            .filter(|worktree| worktree.read(cx).is_visible() || worktree.read(cx).is_single_file())
 258    }
 259
 260    pub fn worktree_for_id(&self, id: WorktreeId, cx: &App) -> Option<Entity<Worktree>> {
 261        self.worktrees()
 262            .find(|worktree| worktree.read(cx).id() == id)
 263    }
 264
 265    pub fn worktree_for_entry(
 266        &self,
 267        entry_id: ProjectEntryId,
 268        cx: &App,
 269    ) -> Option<Entity<Worktree>> {
 270        self.worktrees()
 271            .find(|worktree| worktree.read(cx).contains_entry(entry_id))
 272    }
 273
 274    pub fn find_worktree(
 275        &self,
 276        abs_path: impl AsRef<Path>,
 277        cx: &App,
 278    ) -> Option<(Entity<Worktree>, Arc<RelPath>)> {
 279        let abs_path = SanitizedPath::new(abs_path.as_ref());
 280        for tree in self.worktrees() {
 281            let path_style = tree.read(cx).path_style();
 282            if let Some(relative_path) =
 283                path_style.strip_prefix(abs_path.as_ref(), tree.read(cx).abs_path().as_ref())
 284            {
 285                return Some((tree.clone(), relative_path.into_arc()));
 286            }
 287        }
 288        None
 289    }
 290
 291    pub fn project_path_for_absolute_path(&self, abs_path: &Path, cx: &App) -> Option<ProjectPath> {
 292        self.find_worktree(abs_path, cx)
 293            .map(|(worktree, relative_path)| ProjectPath {
 294                worktree_id: worktree.read(cx).id(),
 295                path: relative_path,
 296            })
 297    }
 298
 299    pub fn absolutize(&self, project_path: &ProjectPath, cx: &App) -> Option<PathBuf> {
 300        let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
 301        Some(worktree.read(cx).absolutize(&project_path.path))
 302    }
 303
 304    pub fn path_style(&self) -> PathStyle {
 305        match &self.state {
 306            WorktreeStoreState::Local { .. } => PathStyle::local(),
 307            WorktreeStoreState::Remote { path_style, .. } => *path_style,
 308        }
 309    }
 310
 311    pub fn find_or_create_worktree(
 312        &mut self,
 313        abs_path: impl AsRef<Path>,
 314        visible: bool,
 315        cx: &mut Context<Self>,
 316    ) -> Task<Result<(Entity<Worktree>, Arc<RelPath>)>> {
 317        let abs_path = abs_path.as_ref();
 318        if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
 319            Task::ready(Ok((tree, relative_path)))
 320        } else {
 321            let worktree = self.create_worktree(abs_path, visible, cx);
 322            cx.background_spawn(async move { Ok((worktree.await?, RelPath::empty().into())) })
 323        }
 324    }
 325
 326    pub fn entry_for_id<'a>(&'a self, entry_id: ProjectEntryId, cx: &'a App) -> Option<&'a Entry> {
 327        self.worktrees()
 328            .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
 329    }
 330
 331    pub fn worktree_and_entry_for_id<'a>(
 332        &'a self,
 333        entry_id: ProjectEntryId,
 334        cx: &'a App,
 335    ) -> Option<(Entity<Worktree>, &'a Entry)> {
 336        self.worktrees().find_map(|worktree| {
 337            worktree
 338                .read(cx)
 339                .entry_for_id(entry_id)
 340                .map(|e| (worktree.clone(), e))
 341        })
 342    }
 343
 344    pub fn entry_for_path<'a>(&'a self, path: &ProjectPath, cx: &'a App) -> Option<&'a Entry> {
 345        self.worktree_for_id(path.worktree_id, cx)?
 346            .read(cx)
 347            .entry_for_path(&path.path)
 348    }
 349
 350    pub fn copy_entry(
 351        &mut self,
 352        entry_id: ProjectEntryId,
 353        new_project_path: ProjectPath,
 354        cx: &mut Context<Self>,
 355    ) -> Task<Result<Option<Entry>>> {
 356        let Some(old_worktree) = self.worktree_for_entry(entry_id, cx) else {
 357            return Task::ready(Err(anyhow!("no such worktree")));
 358        };
 359        let Some(old_entry) = old_worktree.read(cx).entry_for_id(entry_id) else {
 360            return Task::ready(Err(anyhow!("no such entry")));
 361        };
 362        let Some(new_worktree) = self.worktree_for_id(new_project_path.worktree_id, cx) else {
 363            return Task::ready(Err(anyhow!("no such worktree")));
 364        };
 365
 366        match &self.state {
 367            WorktreeStoreState::Local { fs } => {
 368                let old_abs_path = old_worktree.read(cx).absolutize(&old_entry.path);
 369                let new_abs_path = new_worktree.read(cx).absolutize(&new_project_path.path);
 370                let fs = fs.clone();
 371                let copy = cx.background_spawn(async move {
 372                    copy_recursive(
 373                        fs.as_ref(),
 374                        &old_abs_path,
 375                        &new_abs_path,
 376                        Default::default(),
 377                    )
 378                    .await
 379                });
 380
 381                cx.spawn(async move |_, cx| {
 382                    copy.await?;
 383                    new_worktree
 384                        .update(cx, |this, cx| {
 385                            this.as_local_mut().unwrap().refresh_entry(
 386                                new_project_path.path,
 387                                None,
 388                                cx,
 389                            )
 390                        })
 391                        .await
 392                })
 393            }
 394            WorktreeStoreState::Remote {
 395                upstream_client,
 396                upstream_project_id,
 397                ..
 398            } => {
 399                let response = upstream_client.request(proto::CopyProjectEntry {
 400                    project_id: *upstream_project_id,
 401                    entry_id: entry_id.to_proto(),
 402                    new_path: new_project_path.path.to_proto(),
 403                    new_worktree_id: new_project_path.worktree_id.to_proto(),
 404                });
 405                cx.spawn(async move |_, cx| {
 406                    let response = response.await?;
 407                    match response.entry {
 408                        Some(entry) => new_worktree
 409                            .update(cx, |worktree, cx| {
 410                                worktree.as_remote_mut().unwrap().insert_entry(
 411                                    entry,
 412                                    response.worktree_scan_id as usize,
 413                                    cx,
 414                                )
 415                            })
 416                            .await
 417                            .map(Some),
 418                        None => Ok(None),
 419                    }
 420                })
 421            }
 422        }
 423    }
 424
 425    pub fn rename_entry(
 426        &mut self,
 427        entry_id: ProjectEntryId,
 428        new_project_path: ProjectPath,
 429        cx: &mut Context<Self>,
 430    ) -> Task<Result<CreatedEntry>> {
 431        let Some(old_worktree) = self.worktree_for_entry(entry_id, cx) else {
 432            return Task::ready(Err(anyhow!("no such worktree")));
 433        };
 434        let Some(old_entry) = old_worktree.read(cx).entry_for_id(entry_id).cloned() else {
 435            return Task::ready(Err(anyhow!("no such entry")));
 436        };
 437        let Some(new_worktree) = self.worktree_for_id(new_project_path.worktree_id, cx) else {
 438            return Task::ready(Err(anyhow!("no such worktree")));
 439        };
 440
 441        match &self.state {
 442            WorktreeStoreState::Local { fs } => {
 443                let abs_old_path = old_worktree.read(cx).absolutize(&old_entry.path);
 444                let new_worktree_ref = new_worktree.read(cx);
 445                let is_root_entry = new_worktree_ref
 446                    .root_entry()
 447                    .is_some_and(|e| e.id == entry_id);
 448                let abs_new_path = if is_root_entry {
 449                    let abs_path = new_worktree_ref.abs_path();
 450                    let Some(root_parent_path) = abs_path.parent() else {
 451                        return Task::ready(Err(anyhow!("no parent for path {:?}", abs_path)));
 452                    };
 453                    root_parent_path.join(new_project_path.path.as_std_path())
 454                } else {
 455                    new_worktree_ref.absolutize(&new_project_path.path)
 456                };
 457
 458                let fs = fs.clone();
 459                let case_sensitive = new_worktree
 460                    .read(cx)
 461                    .as_local()
 462                    .unwrap()
 463                    .fs_is_case_sensitive();
 464
 465                let do_rename =
 466                    async move |fs: &dyn Fs, old_path: &Path, new_path: &Path, overwrite| {
 467                        fs.rename(
 468                            &old_path,
 469                            &new_path,
 470                            fs::RenameOptions {
 471                                overwrite,
 472                                ..fs::RenameOptions::default()
 473                            },
 474                        )
 475                        .await
 476                        .with_context(|| format!("renaming {old_path:?} into {new_path:?}"))
 477                    };
 478
 479                let rename = cx.background_spawn({
 480                    let abs_new_path = abs_new_path.clone();
 481                    async move {
 482                        // If we're on a case-insensitive FS and we're doing a case-only rename (i.e. `foobar` to `FOOBAR`)
 483                        // we want to overwrite, because otherwise we run into a file-already-exists error.
 484                        let overwrite = !case_sensitive
 485                            && abs_old_path != abs_new_path
 486                            && abs_old_path.to_str().map(|p| p.to_lowercase())
 487                                == abs_new_path.to_str().map(|p| p.to_lowercase());
 488
 489                        // The directory we're renaming into might not exist yet
 490                        if let Err(e) =
 491                            do_rename(fs.as_ref(), &abs_old_path, &abs_new_path, overwrite).await
 492                        {
 493                            if let Some(err) = e.downcast_ref::<std::io::Error>()
 494                                && err.kind() == std::io::ErrorKind::NotFound
 495                            {
 496                                if let Some(parent) = abs_new_path.parent() {
 497                                    fs.create_dir(parent).await.with_context(|| {
 498                                        format!("creating parent directory {parent:?}")
 499                                    })?;
 500                                    return do_rename(
 501                                        fs.as_ref(),
 502                                        &abs_old_path,
 503                                        &abs_new_path,
 504                                        overwrite,
 505                                    )
 506                                    .await;
 507                                }
 508                            }
 509                            return Err(e);
 510                        }
 511                        Ok(())
 512                    }
 513                });
 514
 515                cx.spawn(async move |_, cx| {
 516                    rename.await?;
 517                    Ok(new_worktree
 518                        .update(cx, |this, cx| {
 519                            let local = this.as_local_mut().unwrap();
 520                            if is_root_entry {
 521                                // We eagerly update `abs_path` and refresh this worktree.
 522                                // Otherwise, the FS watcher would do it on the `RootUpdated` event,
 523                                // but with a noticeable delay, so we handle it proactively.
 524                                local.update_abs_path_and_refresh(
 525                                    SanitizedPath::new_arc(&abs_new_path),
 526                                    cx,
 527                                );
 528                                Task::ready(Ok(this.root_entry().cloned()))
 529                            } else {
 530                                // First refresh the parent directory (in case it was newly created)
 531                                if let Some(parent) = new_project_path.path.parent() {
 532                                    let _ = local.refresh_entries_for_paths(vec![parent.into()]);
 533                                }
 534                                // Then refresh the new path
 535                                local.refresh_entry(
 536                                    new_project_path.path.clone(),
 537                                    Some(old_entry.path),
 538                                    cx,
 539                                )
 540                            }
 541                        })
 542                        .await?
 543                        .map(CreatedEntry::Included)
 544                        .unwrap_or_else(|| CreatedEntry::Excluded {
 545                            abs_path: abs_new_path,
 546                        }))
 547                })
 548            }
 549            WorktreeStoreState::Remote {
 550                upstream_client,
 551                upstream_project_id,
 552                ..
 553            } => {
 554                let response = upstream_client.request(proto::RenameProjectEntry {
 555                    project_id: *upstream_project_id,
 556                    entry_id: entry_id.to_proto(),
 557                    new_path: new_project_path.path.to_proto(),
 558                    new_worktree_id: new_project_path.worktree_id.to_proto(),
 559                });
 560                cx.spawn(async move |_, cx| {
 561                    let response = response.await?;
 562                    match response.entry {
 563                        Some(entry) => new_worktree
 564                            .update(cx, |worktree, cx| {
 565                                worktree.as_remote_mut().unwrap().insert_entry(
 566                                    entry,
 567                                    response.worktree_scan_id as usize,
 568                                    cx,
 569                                )
 570                            })
 571                            .await
 572                            .map(CreatedEntry::Included),
 573                        None => {
 574                            let abs_path = new_worktree.read_with(cx, |worktree, _| {
 575                                worktree.absolutize(&new_project_path.path)
 576                            });
 577                            Ok(CreatedEntry::Excluded { abs_path })
 578                        }
 579                    }
 580                })
 581            }
 582        }
 583    }
 584    pub fn create_worktree(
 585        &mut self,
 586        abs_path: impl AsRef<Path>,
 587        visible: bool,
 588        cx: &mut Context<Self>,
 589    ) -> Task<Result<Entity<Worktree>>> {
 590        let abs_path: Arc<SanitizedPath> = SanitizedPath::new_arc(&abs_path);
 591        let is_via_collab = matches!(&self.state, WorktreeStoreState::Remote { upstream_client, .. } if upstream_client.is_via_collab());
 592        if !self.loading_worktrees.contains_key(&abs_path) {
 593            let task = match &self.state {
 594                WorktreeStoreState::Remote {
 595                    upstream_client,
 596                    path_style,
 597                    ..
 598                } => {
 599                    if upstream_client.is_via_collab() {
 600                        Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
 601                    } else {
 602                        let abs_path = RemotePathBuf::new(abs_path.to_string(), *path_style);
 603                        self.create_remote_worktree(upstream_client.clone(), abs_path, visible, cx)
 604                    }
 605                }
 606                WorktreeStoreState::Local { fs } => {
 607                    self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
 608                }
 609            };
 610
 611            self.loading_worktrees
 612                .insert(abs_path.clone(), task.shared());
 613
 614            if visible && self.scanning_enabled {
 615                *self.initial_scan_complete.0.borrow_mut() = false;
 616            }
 617        }
 618        let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
 619        cx.spawn(async move |this, cx| {
 620            let result = task.await;
 621            this.update(cx, |this, cx| {
 622                this.loading_worktrees.remove(&abs_path);
 623                if !visible || !this.scanning_enabled || result.is_err() {
 624                    this.update_initial_scan_state(cx);
 625                }
 626            })
 627            .ok();
 628
 629            match result {
 630                Ok(worktree) => {
 631                    if !is_via_collab {
 632                        if let Some((trusted_worktrees, worktree_store)) = this
 633                            .update(cx, |_, cx| {
 634                                TrustedWorktrees::try_get_global(cx).zip(Some(cx.entity()))
 635                            })
 636                            .ok()
 637                            .flatten()
 638                        {
 639                            trusted_worktrees.update(cx, |trusted_worktrees, cx| {
 640                                trusted_worktrees.can_trust(
 641                                    &worktree_store,
 642                                    worktree.read(cx).id(),
 643                                    cx,
 644                                );
 645                            });
 646                        }
 647
 648                        this.update(cx, |this, cx| {
 649                            if this.scanning_enabled && visible {
 650                                this.observe_worktree_scan_completion(&worktree, cx);
 651                            }
 652                        })
 653                        .ok();
 654                    }
 655                    Ok(worktree)
 656                }
 657                Err(err) => Err((*err).cloned()),
 658            }
 659        })
 660    }
 661
 662    fn create_remote_worktree(
 663        &mut self,
 664        client: AnyProtoClient,
 665        abs_path: RemotePathBuf,
 666        visible: bool,
 667        cx: &mut Context<Self>,
 668    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
 669        let path_style = abs_path.path_style();
 670        let mut abs_path = abs_path.to_string();
 671        // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
 672        // in which case want to strip the leading the `/`.
 673        // On the host-side, the `~` will get expanded.
 674        // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
 675        if abs_path.starts_with("/~") {
 676            abs_path = abs_path[1..].to_string();
 677        }
 678        if abs_path.is_empty() {
 679            abs_path = "~/".to_string();
 680        }
 681
 682        cx.spawn(async move |this, cx| {
 683            let this = this.upgrade().context("Dropped worktree store")?;
 684
 685            let path = RemotePathBuf::new(abs_path, path_style);
 686            let response = client
 687                .request(proto::AddWorktree {
 688                    project_id: REMOTE_SERVER_PROJECT_ID,
 689                    path: path.to_proto(),
 690                    visible,
 691                })
 692                .await?;
 693
 694            if let Some(existing_worktree) = this.read_with(cx, |this, cx| {
 695                this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
 696            }) {
 697                return Ok(existing_worktree);
 698            }
 699
 700            let root_path_buf = PathBuf::from(response.canonicalized_path.clone());
 701            let root_name = root_path_buf
 702                .file_name()
 703                .map(|n| n.to_string_lossy().into_owned())
 704                .unwrap_or(root_path_buf.to_string_lossy().into_owned());
 705
 706            let worktree = cx.update(|cx| {
 707                Worktree::remote(
 708                    REMOTE_SERVER_PROJECT_ID,
 709                    ReplicaId::REMOTE_SERVER,
 710                    proto::WorktreeMetadata {
 711                        id: response.worktree_id,
 712                        root_name,
 713                        visible,
 714                        abs_path: response.canonicalized_path,
 715                    },
 716                    client,
 717                    path_style,
 718                    cx,
 719                )
 720            });
 721
 722            this.update(cx, |this, cx| {
 723                this.add(&worktree, cx);
 724            });
 725            Ok(worktree)
 726        })
 727    }
 728
 729    fn create_local_worktree(
 730        &mut self,
 731        fs: Arc<dyn Fs>,
 732        abs_path: Arc<SanitizedPath>,
 733        visible: bool,
 734        cx: &mut Context<Self>,
 735    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
 736        let next_entry_id = self.next_entry_id.clone();
 737        let scanning_enabled = self.scanning_enabled;
 738
 739        let next_worktree_id = self.next_worktree_id();
 740
 741        cx.spawn(async move |this, cx| {
 742            let worktree_id = next_worktree_id.await?;
 743            let worktree = Worktree::local(
 744                SanitizedPath::cast_arc(abs_path.clone()),
 745                visible,
 746                fs,
 747                next_entry_id,
 748                scanning_enabled,
 749                worktree_id,
 750                cx,
 751            )
 752            .await?;
 753
 754            this.update(cx, |this, cx| this.add(&worktree, cx))?;
 755
 756            if visible {
 757                cx.update(|cx| {
 758                    cx.add_recent_document(abs_path.as_path());
 759                });
 760            }
 761
 762            Ok(worktree)
 763        })
 764    }
 765
 766    pub fn add(&mut self, worktree: &Entity<Worktree>, cx: &mut Context<Self>) {
 767        let worktree_id = worktree.read(cx).id();
 768        debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
 769
 770        let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
 771        let handle = if push_strong_handle {
 772            WorktreeHandle::Strong(worktree.clone())
 773        } else {
 774            WorktreeHandle::Weak(worktree.downgrade())
 775        };
 776        if self.worktrees_reordered {
 777            self.worktrees.push(handle);
 778        } else {
 779            let i = match self
 780                .worktrees
 781                .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
 782                    other.upgrade().map(|worktree| worktree.read(cx).abs_path())
 783                }) {
 784                Ok(i) | Err(i) => i,
 785            };
 786            self.worktrees.insert(i, handle);
 787        }
 788
 789        cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
 790        self.send_project_updates(cx);
 791
 792        let handle_id = worktree.entity_id();
 793        cx.subscribe(worktree, |_, worktree, event, cx| {
 794            let worktree_id = worktree.read(cx).id();
 795            match event {
 796                worktree::Event::UpdatedEntries(changes) => {
 797                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedEntries(
 798                        worktree_id,
 799                        changes.clone(),
 800                    ));
 801                }
 802                worktree::Event::UpdatedGitRepositories(set) => {
 803                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedGitRepositories(
 804                        worktree_id,
 805                        set.clone(),
 806                    ));
 807                }
 808                worktree::Event::DeletedEntry(id) => {
 809                    cx.emit(WorktreeStoreEvent::WorktreeDeletedEntry(worktree_id, *id))
 810                }
 811                worktree::Event::Deleted => {
 812                    // The worktree root itself has been deleted (for single-file worktrees)
 813                    // The worktree will be removed via the observe_release callback
 814                }
 815            }
 816        })
 817        .detach();
 818        cx.observe_release(worktree, move |this, worktree, cx| {
 819            cx.emit(WorktreeStoreEvent::WorktreeReleased(
 820                handle_id,
 821                worktree.id(),
 822            ));
 823            cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 824                handle_id,
 825                worktree.id(),
 826            ));
 827            this.send_project_updates(cx);
 828        })
 829        .detach();
 830    }
 831
 832    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut Context<Self>) {
 833        self.worktrees.retain(|worktree| {
 834            if let Some(worktree) = worktree.upgrade() {
 835                if worktree.read(cx).id() == id_to_remove {
 836                    cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 837                        worktree.entity_id(),
 838                        id_to_remove,
 839                    ));
 840                    false
 841                } else {
 842                    true
 843                }
 844            } else {
 845                false
 846            }
 847        });
 848        self.update_initial_scan_state(cx);
 849        self.send_project_updates(cx);
 850    }
 851
 852    pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
 853        self.worktrees_reordered = worktrees_reordered;
 854    }
 855
 856    fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
 857        match &self.state {
 858            WorktreeStoreState::Remote {
 859                upstream_client,
 860                upstream_project_id,
 861                ..
 862            } => Some((upstream_client.clone(), *upstream_project_id)),
 863            WorktreeStoreState::Local { .. } => None,
 864        }
 865    }
 866
 867    pub fn set_worktrees_from_proto(
 868        &mut self,
 869        worktrees: Vec<proto::WorktreeMetadata>,
 870        replica_id: ReplicaId,
 871        cx: &mut Context<Self>,
 872    ) -> Result<()> {
 873        let mut old_worktrees_by_id = self
 874            .worktrees
 875            .drain(..)
 876            .filter_map(|worktree| {
 877                let worktree = worktree.upgrade()?;
 878                Some((worktree.read(cx).id(), worktree))
 879            })
 880            .collect::<HashMap<_, _>>();
 881
 882        let (client, project_id) = self.upstream_client().context("invalid project")?;
 883
 884        for worktree in worktrees {
 885            if let Some(old_worktree) =
 886                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
 887            {
 888                let push_strong_handle =
 889                    self.retain_worktrees || old_worktree.read(cx).is_visible();
 890                let handle = if push_strong_handle {
 891                    WorktreeHandle::Strong(old_worktree.clone())
 892                } else {
 893                    WorktreeHandle::Weak(old_worktree.downgrade())
 894                };
 895                self.worktrees.push(handle);
 896            } else {
 897                self.add(
 898                    &Worktree::remote(
 899                        project_id,
 900                        replica_id,
 901                        worktree,
 902                        client.clone(),
 903                        self.path_style(),
 904                        cx,
 905                    ),
 906                    cx,
 907                );
 908            }
 909        }
 910        self.send_project_updates(cx);
 911
 912        Ok(())
 913    }
 914
 915    pub fn move_worktree(
 916        &mut self,
 917        source: WorktreeId,
 918        destination: WorktreeId,
 919        cx: &mut Context<Self>,
 920    ) -> Result<()> {
 921        if source == destination {
 922            return Ok(());
 923        }
 924
 925        let mut source_index = None;
 926        let mut destination_index = None;
 927        for (i, worktree) in self.worktrees.iter().enumerate() {
 928            if let Some(worktree) = worktree.upgrade() {
 929                let worktree_id = worktree.read(cx).id();
 930                if worktree_id == source {
 931                    source_index = Some(i);
 932                    if destination_index.is_some() {
 933                        break;
 934                    }
 935                } else if worktree_id == destination {
 936                    destination_index = Some(i);
 937                    if source_index.is_some() {
 938                        break;
 939                    }
 940                }
 941            }
 942        }
 943
 944        let source_index =
 945            source_index.with_context(|| format!("Missing worktree for id {source}"))?;
 946        let destination_index =
 947            destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
 948
 949        if source_index == destination_index {
 950            return Ok(());
 951        }
 952
 953        let worktree_to_move = self.worktrees.remove(source_index);
 954        self.worktrees.insert(destination_index, worktree_to_move);
 955        self.worktrees_reordered = true;
 956        cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
 957        cx.notify();
 958        Ok(())
 959    }
 960
 961    pub fn disconnected_from_host(&mut self, cx: &mut App) {
 962        for worktree in &self.worktrees {
 963            if let Some(worktree) = worktree.upgrade() {
 964                worktree.update(cx, |worktree, _| {
 965                    if let Some(worktree) = worktree.as_remote_mut() {
 966                        worktree.disconnected_from_host();
 967                    }
 968                });
 969            }
 970        }
 971    }
 972
 973    pub fn send_project_updates(&mut self, cx: &mut Context<Self>) {
 974        let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
 975            return;
 976        };
 977
 978        let update = proto::UpdateProject {
 979            project_id,
 980            worktrees: self.worktree_metadata_protos(cx),
 981        };
 982
 983        // collab has bad concurrency guarantees, so we send requests in serial.
 984        let update_project = if downstream_client.is_via_collab() {
 985            Some(downstream_client.request(update))
 986        } else {
 987            downstream_client.send(update).log_err();
 988            None
 989        };
 990        cx.spawn(async move |this, cx| {
 991            if let Some(update_project) = update_project {
 992                update_project.await?;
 993            }
 994
 995            this.update(cx, |this, cx| {
 996                let worktrees = this.worktrees().collect::<Vec<_>>();
 997
 998                for worktree in worktrees {
 999                    worktree.update(cx, |worktree, cx| {
1000                        let client = downstream_client.clone();
1001                        worktree.observe_updates(project_id, cx, {
1002                            move |update| {
1003                                let client = client.clone();
1004                                async move {
1005                                    if client.is_via_collab() {
1006                                        client
1007                                            .request(update)
1008                                            .map(|result| result.log_err().is_some())
1009                                            .await
1010                                    } else {
1011                                        client.send(update).log_err().is_some()
1012                                    }
1013                                }
1014                            }
1015                        });
1016                    });
1017
1018                    cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
1019                }
1020
1021                anyhow::Ok(())
1022            })
1023        })
1024        .detach_and_log_err(cx);
1025    }
1026
1027    pub fn worktree_metadata_protos(&self, cx: &App) -> Vec<proto::WorktreeMetadata> {
1028        self.worktrees()
1029            .map(|worktree| {
1030                let worktree = worktree.read(cx);
1031                proto::WorktreeMetadata {
1032                    id: worktree.id().to_proto(),
1033                    root_name: worktree.root_name_str().to_owned(),
1034                    visible: worktree.is_visible(),
1035                    abs_path: worktree.abs_path().to_string_lossy().into_owned(),
1036                }
1037            })
1038            .collect()
1039    }
1040
1041    pub fn shared(
1042        &mut self,
1043        remote_id: u64,
1044        downstream_client: AnyProtoClient,
1045        cx: &mut Context<Self>,
1046    ) {
1047        self.retain_worktrees = true;
1048        self.downstream_client = Some((downstream_client, remote_id));
1049
1050        // When shared, retain all worktrees
1051        for worktree_handle in self.worktrees.iter_mut() {
1052            match worktree_handle {
1053                WorktreeHandle::Strong(_) => {}
1054                WorktreeHandle::Weak(worktree) => {
1055                    if let Some(worktree) = worktree.upgrade() {
1056                        *worktree_handle = WorktreeHandle::Strong(worktree);
1057                    }
1058                }
1059            }
1060        }
1061        // Only send project updates if we share in a collaborative mode.
1062        // Otherwise we are the remote server which is currently constructing
1063        // worktree store before the client actually has set up its message
1064        // handlers.
1065        if remote_id != REMOTE_SERVER_PROJECT_ID {
1066            self.send_project_updates(cx);
1067        }
1068    }
1069
1070    pub fn unshared(&mut self, cx: &mut Context<Self>) {
1071        self.retain_worktrees = false;
1072        self.downstream_client.take();
1073
1074        // When not shared, only retain the visible worktrees
1075        for worktree_handle in self.worktrees.iter_mut() {
1076            if let WorktreeHandle::Strong(worktree) = worktree_handle {
1077                let is_visible = worktree.update(cx, |worktree, _| {
1078                    worktree.stop_observing_updates();
1079                    worktree.is_visible()
1080                });
1081                if !is_visible {
1082                    *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
1083                }
1084            }
1085        }
1086    }
1087
1088    pub async fn handle_create_project_entry(
1089        this: Entity<Self>,
1090        envelope: TypedEnvelope<proto::CreateProjectEntry>,
1091        mut cx: AsyncApp,
1092    ) -> Result<proto::ProjectEntryResponse> {
1093        let worktree = this.update(&mut cx, |this, cx| {
1094            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
1095            this.worktree_for_id(worktree_id, cx)
1096                .context("worktree not found")
1097        })?;
1098        Worktree::handle_create_entry(worktree, envelope.payload, cx).await
1099    }
1100
1101    pub async fn handle_copy_project_entry(
1102        this: Entity<Self>,
1103        envelope: TypedEnvelope<proto::CopyProjectEntry>,
1104        mut cx: AsyncApp,
1105    ) -> Result<proto::ProjectEntryResponse> {
1106        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1107        let new_worktree_id = WorktreeId::from_proto(envelope.payload.new_worktree_id);
1108        let new_project_path = (
1109            new_worktree_id,
1110            RelPath::from_proto(&envelope.payload.new_path)?,
1111        );
1112        let (scan_id, entry) = this.update(&mut cx, |this, cx| {
1113            let Some((_, project_id)) = this.downstream_client else {
1114                bail!("no downstream client")
1115            };
1116            let Some(entry) = this.entry_for_id(entry_id, cx) else {
1117                bail!("no such entry");
1118            };
1119            if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1120                bail!("entry is private")
1121            }
1122
1123            let new_worktree = this
1124                .worktree_for_id(new_worktree_id, cx)
1125                .context("no such worktree")?;
1126            let scan_id = new_worktree.read(cx).scan_id();
1127            anyhow::Ok((
1128                scan_id,
1129                this.copy_entry(entry_id, new_project_path.into(), cx),
1130            ))
1131        })?;
1132        let entry = entry.await?;
1133        Ok(proto::ProjectEntryResponse {
1134            entry: entry.as_ref().map(|entry| entry.into()),
1135            worktree_scan_id: scan_id as u64,
1136        })
1137    }
1138
1139    pub async fn handle_delete_project_entry(
1140        this: Entity<Self>,
1141        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
1142        mut cx: AsyncApp,
1143    ) -> Result<proto::ProjectEntryResponse> {
1144        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1145        let worktree = this.update(&mut cx, |this, cx| {
1146            let Some((_, project_id)) = this.downstream_client else {
1147                bail!("no downstream client")
1148            };
1149            let Some(entry) = this.entry_for_id(entry_id, cx) else {
1150                bail!("no entry")
1151            };
1152            if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1153                bail!("entry is private")
1154            }
1155            this.worktree_for_entry(entry_id, cx)
1156                .context("worktree not found")
1157        })?;
1158        Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
1159    }
1160
1161    pub async fn handle_rename_project_entry(
1162        this: Entity<Self>,
1163        request: proto::RenameProjectEntry,
1164        mut cx: AsyncApp,
1165    ) -> Result<proto::ProjectEntryResponse> {
1166        let entry_id = ProjectEntryId::from_proto(request.entry_id);
1167        let new_worktree_id = WorktreeId::from_proto(request.new_worktree_id);
1168        let rel_path = RelPath::from_proto(&request.new_path)
1169            .with_context(|| format!("received invalid relative path {:?}", &request.new_path))?;
1170
1171        let (scan_id, task) = this.update(&mut cx, |this, cx| {
1172            let worktree = this
1173                .worktree_for_entry(entry_id, cx)
1174                .context("no such worktree")?;
1175
1176            let Some((_, project_id)) = this.downstream_client else {
1177                bail!("no downstream client")
1178            };
1179            let entry = worktree
1180                .read(cx)
1181                .entry_for_id(entry_id)
1182                .ok_or_else(|| anyhow!("missing entry"))?;
1183            if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1184                bail!("entry is private")
1185            }
1186
1187            let scan_id = worktree.read(cx).scan_id();
1188            anyhow::Ok((
1189                scan_id,
1190                this.rename_entry(entry_id, (new_worktree_id, rel_path).into(), cx),
1191            ))
1192        })?;
1193        Ok(proto::ProjectEntryResponse {
1194            entry: match &task.await? {
1195                CreatedEntry::Included(entry) => Some(entry.into()),
1196                CreatedEntry::Excluded { .. } => None,
1197            },
1198            worktree_scan_id: scan_id as u64,
1199        })
1200    }
1201
1202    pub async fn handle_expand_project_entry(
1203        this: Entity<Self>,
1204        envelope: TypedEnvelope<proto::ExpandProjectEntry>,
1205        mut cx: AsyncApp,
1206    ) -> Result<proto::ExpandProjectEntryResponse> {
1207        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1208        let worktree = this
1209            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))
1210            .context("invalid request")?;
1211        Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
1212    }
1213
1214    pub async fn handle_expand_all_for_project_entry(
1215        this: Entity<Self>,
1216        envelope: TypedEnvelope<proto::ExpandAllForProjectEntry>,
1217        mut cx: AsyncApp,
1218    ) -> Result<proto::ExpandAllForProjectEntryResponse> {
1219        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1220        let worktree = this
1221            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))
1222            .context("invalid request")?;
1223        Worktree::handle_expand_all_for_entry(worktree, envelope.payload, cx).await
1224    }
1225
1226    pub async fn handle_allocate_worktree_id(
1227        _this: Entity<Self>,
1228        _envelope: TypedEnvelope<proto::AllocateWorktreeId>,
1229        cx: AsyncApp,
1230    ) -> Result<proto::AllocateWorktreeIdResponse> {
1231        let worktree_id = cx.update(|cx| WorktreeIdCounter::get(cx).next());
1232        Ok(proto::AllocateWorktreeIdResponse { worktree_id })
1233    }
1234
1235    pub fn fs(&self) -> Option<Arc<dyn Fs>> {
1236        match &self.state {
1237            WorktreeStoreState::Local { fs } => Some(fs.clone()),
1238            WorktreeStoreState::Remote { .. } => None,
1239        }
1240    }
1241}
1242
1243#[derive(Clone, Debug)]
1244enum WorktreeHandle {
1245    Strong(Entity<Worktree>),
1246    Weak(WeakEntity<Worktree>),
1247}
1248
1249impl WorktreeHandle {
1250    fn upgrade(&self) -> Option<Entity<Worktree>> {
1251        match self {
1252            WorktreeHandle::Strong(handle) => Some(handle.clone()),
1253            WorktreeHandle::Weak(handle) => handle.upgrade(),
1254        }
1255    }
1256}