worktree_store.rs

   1use std::{
   2    io::{BufRead, BufReader},
   3    path::{Path, PathBuf},
   4    pin::pin,
   5    sync::{Arc, atomic::AtomicUsize},
   6};
   7
   8use anyhow::{Context as _, Result, anyhow, bail};
   9use collections::{HashMap, HashSet};
  10use fs::{Fs, copy_recursive};
  11use futures::{FutureExt, SinkExt, future::Shared};
  12use gpui::{
  13    App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity,
  14};
  15use postage::oneshot;
  16use rpc::{
  17    AnyProtoClient, ErrorExt, TypedEnvelope,
  18    proto::{self, REMOTE_SERVER_PROJECT_ID},
  19};
  20use smol::{
  21    channel::{Receiver, Sender},
  22    stream::StreamExt,
  23};
  24use text::ReplicaId;
  25use util::{
  26    ResultExt,
  27    paths::{PathStyle, RemotePathBuf, SanitizedPath},
  28    rel_path::RelPath,
  29};
  30use worktree::{
  31    CreatedEntry, Entry, ProjectEntryId, UpdatedEntriesSet, UpdatedGitRepositoriesSet, Worktree,
  32    WorktreeId, WorktreeSettings,
  33};
  34
  35use crate::{ProjectPath, search::SearchQuery};
  36
  37struct MatchingEntry {
  38    worktree_root: Arc<Path>,
  39    path: ProjectPath,
  40    respond: oneshot::Sender<ProjectPath>,
  41}
  42
  43enum WorktreeStoreState {
  44    Local {
  45        fs: Arc<dyn Fs>,
  46    },
  47    Remote {
  48        upstream_client: AnyProtoClient,
  49        upstream_project_id: u64,
  50        path_style: PathStyle,
  51    },
  52}
  53
  54pub struct WorktreeStore {
  55    next_entry_id: Arc<AtomicUsize>,
  56    downstream_client: Option<(AnyProtoClient, u64)>,
  57    retain_worktrees: bool,
  58    worktrees: Vec<WorktreeHandle>,
  59    worktrees_reordered: bool,
  60    #[allow(clippy::type_complexity)]
  61    loading_worktrees:
  62        HashMap<Arc<SanitizedPath>, Shared<Task<Result<Entity<Worktree>, Arc<anyhow::Error>>>>>,
  63    state: WorktreeStoreState,
  64}
  65
  66#[derive(Debug)]
  67pub enum WorktreeStoreEvent {
  68    WorktreeAdded(Entity<Worktree>),
  69    WorktreeRemoved(EntityId, WorktreeId),
  70    WorktreeReleased(EntityId, WorktreeId),
  71    WorktreeOrderChanged,
  72    WorktreeUpdateSent(Entity<Worktree>),
  73    WorktreeUpdatedEntries(WorktreeId, UpdatedEntriesSet),
  74    WorktreeUpdatedGitRepositories(WorktreeId, UpdatedGitRepositoriesSet),
  75    WorktreeDeletedEntry(WorktreeId, ProjectEntryId),
  76}
  77
  78impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
  79
  80impl WorktreeStore {
  81    pub fn init(client: &AnyProtoClient) {
  82        client.add_entity_request_handler(Self::handle_create_project_entry);
  83        client.add_entity_request_handler(Self::handle_copy_project_entry);
  84        client.add_entity_request_handler(Self::handle_delete_project_entry);
  85        client.add_entity_request_handler(Self::handle_expand_project_entry);
  86        client.add_entity_request_handler(Self::handle_expand_all_for_project_entry);
  87    }
  88
  89    pub fn local(retain_worktrees: bool, fs: Arc<dyn Fs>) -> Self {
  90        Self {
  91            next_entry_id: Default::default(),
  92            loading_worktrees: Default::default(),
  93            downstream_client: None,
  94            worktrees: Vec::new(),
  95            worktrees_reordered: false,
  96            retain_worktrees,
  97            state: WorktreeStoreState::Local { fs },
  98        }
  99    }
 100
 101    pub fn remote(
 102        retain_worktrees: bool,
 103        upstream_client: AnyProtoClient,
 104        upstream_project_id: u64,
 105        path_style: PathStyle,
 106    ) -> Self {
 107        Self {
 108            next_entry_id: Default::default(),
 109            loading_worktrees: Default::default(),
 110            downstream_client: None,
 111            worktrees: Vec::new(),
 112            worktrees_reordered: false,
 113            retain_worktrees,
 114            state: WorktreeStoreState::Remote {
 115                upstream_client,
 116                upstream_project_id,
 117                path_style,
 118            },
 119        }
 120    }
 121
 122    /// Iterates through all worktrees, including ones that don't appear in the project panel
 123    pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Entity<Worktree>> {
 124        self.worktrees
 125            .iter()
 126            .filter_map(move |worktree| worktree.upgrade())
 127    }
 128
 129    /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
 130    pub fn visible_worktrees<'a>(
 131        &'a self,
 132        cx: &'a App,
 133    ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
 134        self.worktrees()
 135            .filter(|worktree| worktree.read(cx).is_visible())
 136    }
 137
 138    /// Iterates through all user-visible worktrees (directories and files that appear in the project panel) and other, invisible single files that could appear e.g. due to drag and drop.
 139    pub fn visible_worktrees_and_single_files<'a>(
 140        &'a self,
 141        cx: &'a App,
 142    ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
 143        self.worktrees()
 144            .filter(|worktree| worktree.read(cx).is_visible() || worktree.read(cx).is_single_file())
 145    }
 146
 147    pub fn worktree_for_id(&self, id: WorktreeId, cx: &App) -> Option<Entity<Worktree>> {
 148        self.worktrees()
 149            .find(|worktree| worktree.read(cx).id() == id)
 150    }
 151
 152    pub fn worktree_for_entry(
 153        &self,
 154        entry_id: ProjectEntryId,
 155        cx: &App,
 156    ) -> Option<Entity<Worktree>> {
 157        self.worktrees()
 158            .find(|worktree| worktree.read(cx).contains_entry(entry_id))
 159    }
 160
 161    pub fn find_worktree(
 162        &self,
 163        abs_path: impl AsRef<Path>,
 164        cx: &App,
 165    ) -> Option<(Entity<Worktree>, Arc<RelPath>)> {
 166        let abs_path = SanitizedPath::new(abs_path.as_ref());
 167        for tree in self.worktrees() {
 168            let path_style = tree.read(cx).path_style();
 169            if let Ok(relative_path) = abs_path.as_ref().strip_prefix(tree.read(cx).abs_path())
 170                && let Ok(relative_path) = RelPath::new(relative_path, path_style)
 171            {
 172                return Some((tree.clone(), relative_path.into_arc()));
 173            }
 174        }
 175        None
 176    }
 177
 178    pub fn absolutize(&self, project_path: &ProjectPath, cx: &App) -> Option<PathBuf> {
 179        let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
 180        Some(worktree.read(cx).absolutize(&project_path.path))
 181    }
 182
 183    pub fn path_style(&self) -> PathStyle {
 184        match &self.state {
 185            WorktreeStoreState::Local { .. } => PathStyle::local(),
 186            WorktreeStoreState::Remote { path_style, .. } => *path_style,
 187        }
 188    }
 189
 190    pub fn find_or_create_worktree(
 191        &mut self,
 192        abs_path: impl AsRef<Path>,
 193        visible: bool,
 194        cx: &mut Context<Self>,
 195    ) -> Task<Result<(Entity<Worktree>, Arc<RelPath>)>> {
 196        let abs_path = abs_path.as_ref();
 197        if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
 198            Task::ready(Ok((tree, relative_path)))
 199        } else {
 200            let worktree = self.create_worktree(abs_path, visible, cx);
 201            cx.background_spawn(async move { Ok((worktree.await?, RelPath::empty().into())) })
 202        }
 203    }
 204
 205    pub fn entry_for_id<'a>(&'a self, entry_id: ProjectEntryId, cx: &'a App) -> Option<&'a Entry> {
 206        self.worktrees()
 207            .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
 208    }
 209
 210    pub fn worktree_and_entry_for_id<'a>(
 211        &'a self,
 212        entry_id: ProjectEntryId,
 213        cx: &'a App,
 214    ) -> Option<(Entity<Worktree>, &'a Entry)> {
 215        self.worktrees().find_map(|worktree| {
 216            worktree
 217                .read(cx)
 218                .entry_for_id(entry_id)
 219                .map(|e| (worktree.clone(), e))
 220        })
 221    }
 222
 223    pub fn entry_for_path<'a>(&'a self, path: &ProjectPath, cx: &'a App) -> Option<&'a Entry> {
 224        self.worktree_for_id(path.worktree_id, cx)?
 225            .read(cx)
 226            .entry_for_path(&path.path)
 227    }
 228
 229    pub fn copy_entry(
 230        &mut self,
 231        entry_id: ProjectEntryId,
 232        new_project_path: ProjectPath,
 233        cx: &mut Context<Self>,
 234    ) -> Task<Result<Option<Entry>>> {
 235        let Some(old_worktree) = self.worktree_for_entry(entry_id, cx) else {
 236            return Task::ready(Err(anyhow!("no such worktree")));
 237        };
 238        let Some(old_entry) = old_worktree.read(cx).entry_for_id(entry_id) else {
 239            return Task::ready(Err(anyhow!("no such entry")));
 240        };
 241        let Some(new_worktree) = self.worktree_for_id(new_project_path.worktree_id, cx) else {
 242            return Task::ready(Err(anyhow!("no such worktree")));
 243        };
 244
 245        match &self.state {
 246            WorktreeStoreState::Local { fs } => {
 247                let old_abs_path = old_worktree.read(cx).absolutize(&old_entry.path);
 248                let new_abs_path = new_worktree.read(cx).absolutize(&new_project_path.path);
 249                let fs = fs.clone();
 250                let copy = cx.background_spawn(async move {
 251                    copy_recursive(
 252                        fs.as_ref(),
 253                        &old_abs_path,
 254                        &new_abs_path,
 255                        Default::default(),
 256                    )
 257                    .await
 258                });
 259
 260                cx.spawn(async move |_, cx| {
 261                    copy.await?;
 262                    new_worktree
 263                        .update(cx, |this, cx| {
 264                            this.as_local_mut().unwrap().refresh_entry(
 265                                new_project_path.path,
 266                                None,
 267                                cx,
 268                            )
 269                        })?
 270                        .await
 271                })
 272            }
 273            WorktreeStoreState::Remote {
 274                upstream_client,
 275                upstream_project_id,
 276                ..
 277            } => {
 278                let response = upstream_client.request(proto::CopyProjectEntry {
 279                    project_id: *upstream_project_id,
 280                    entry_id: entry_id.to_proto(),
 281                    new_path: new_project_path.path.to_proto(),
 282                    new_worktree_id: new_project_path.worktree_id.to_proto(),
 283                });
 284                cx.spawn(async move |_, cx| {
 285                    let response = response.await?;
 286                    match response.entry {
 287                        Some(entry) => new_worktree
 288                            .update(cx, |worktree, cx| {
 289                                worktree.as_remote_mut().unwrap().insert_entry(
 290                                    entry,
 291                                    response.worktree_scan_id as usize,
 292                                    cx,
 293                                )
 294                            })?
 295                            .await
 296                            .map(Some),
 297                        None => Ok(None),
 298                    }
 299                })
 300            }
 301        }
 302    }
 303
 304    pub fn rename_entry(
 305        &mut self,
 306        entry_id: ProjectEntryId,
 307        new_project_path: ProjectPath,
 308        cx: &mut Context<Self>,
 309    ) -> Task<Result<CreatedEntry>> {
 310        let Some(old_worktree) = self.worktree_for_entry(entry_id, cx) else {
 311            return Task::ready(Err(anyhow!("no such worktree")));
 312        };
 313        let Some(old_entry) = old_worktree.read(cx).entry_for_id(entry_id).cloned() else {
 314            return Task::ready(Err(anyhow!("no such entry")));
 315        };
 316        let Some(new_worktree) = self.worktree_for_id(new_project_path.worktree_id, cx) else {
 317            return Task::ready(Err(anyhow!("no such worktree")));
 318        };
 319
 320        match &self.state {
 321            WorktreeStoreState::Local { fs } => {
 322                let abs_old_path = old_worktree.read(cx).absolutize(&old_entry.path);
 323                let new_worktree_ref = new_worktree.read(cx);
 324                let is_root_entry = new_worktree_ref
 325                    .root_entry()
 326                    .is_some_and(|e| e.id == entry_id);
 327                let abs_new_path = if is_root_entry {
 328                    let abs_path = new_worktree_ref.abs_path();
 329                    let Some(root_parent_path) = abs_path.parent() else {
 330                        return Task::ready(Err(anyhow!("no parent for path {:?}", abs_path)));
 331                    };
 332                    root_parent_path.join(new_project_path.path.as_std_path())
 333                } else {
 334                    new_worktree_ref.absolutize(&new_project_path.path)
 335                };
 336
 337                let fs = fs.clone();
 338                let case_sensitive = new_worktree
 339                    .read(cx)
 340                    .as_local()
 341                    .unwrap()
 342                    .fs_is_case_sensitive();
 343
 344                let do_rename =
 345                    async move |fs: &dyn Fs, old_path: &Path, new_path: &Path, overwrite| {
 346                        fs.rename(
 347                            &old_path,
 348                            &new_path,
 349                            fs::RenameOptions {
 350                                overwrite,
 351                                ..fs::RenameOptions::default()
 352                            },
 353                        )
 354                        .await
 355                        .with_context(|| format!("renaming {old_path:?} into {new_path:?}"))
 356                    };
 357
 358                let rename = cx.background_spawn({
 359                    let abs_new_path = abs_new_path.clone();
 360                    async move {
 361                        // If we're on a case-insensitive FS and we're doing a case-only rename (i.e. `foobar` to `FOOBAR`)
 362                        // we want to overwrite, because otherwise we run into a file-already-exists error.
 363                        let overwrite = !case_sensitive
 364                            && abs_old_path != abs_new_path
 365                            && abs_old_path.to_str().map(|p| p.to_lowercase())
 366                                == abs_new_path.to_str().map(|p| p.to_lowercase());
 367
 368                        // The directory we're renaming into might not exist yet
 369                        if let Err(e) =
 370                            do_rename(fs.as_ref(), &abs_old_path, &abs_new_path, overwrite).await
 371                        {
 372                            if let Some(err) = e.downcast_ref::<std::io::Error>()
 373                                && err.kind() == std::io::ErrorKind::NotFound
 374                            {
 375                                if let Some(parent) = abs_new_path.parent() {
 376                                    fs.create_dir(parent).await.with_context(|| {
 377                                        format!("creating parent directory {parent:?}")
 378                                    })?;
 379                                    return do_rename(
 380                                        fs.as_ref(),
 381                                        &abs_old_path,
 382                                        &abs_new_path,
 383                                        overwrite,
 384                                    )
 385                                    .await;
 386                                }
 387                            }
 388                            return Err(e);
 389                        }
 390                        Ok(())
 391                    }
 392                });
 393
 394                cx.spawn(async move |_, cx| {
 395                    rename.await?;
 396                    Ok(new_worktree
 397                        .update(cx, |this, cx| {
 398                            let local = this.as_local_mut().unwrap();
 399                            if is_root_entry {
 400                                // We eagerly update `abs_path` and refresh this worktree.
 401                                // Otherwise, the FS watcher would do it on the `RootUpdated` event,
 402                                // but with a noticeable delay, so we handle it proactively.
 403                                local.update_abs_path_and_refresh(
 404                                    SanitizedPath::new_arc(&abs_new_path),
 405                                    cx,
 406                                );
 407                                Task::ready(Ok(this.root_entry().cloned()))
 408                            } else {
 409                                // First refresh the parent directory (in case it was newly created)
 410                                if let Some(parent) = new_project_path.path.parent() {
 411                                    let _ = local.refresh_entries_for_paths(vec![parent.into()]);
 412                                }
 413                                // Then refresh the new path
 414                                local.refresh_entry(
 415                                    new_project_path.path.clone(),
 416                                    Some(old_entry.path),
 417                                    cx,
 418                                )
 419                            }
 420                        })?
 421                        .await?
 422                        .map(CreatedEntry::Included)
 423                        .unwrap_or_else(|| CreatedEntry::Excluded {
 424                            abs_path: abs_new_path,
 425                        }))
 426                })
 427            }
 428            WorktreeStoreState::Remote {
 429                upstream_client,
 430                upstream_project_id,
 431                ..
 432            } => {
 433                let response = upstream_client.request(proto::RenameProjectEntry {
 434                    project_id: *upstream_project_id,
 435                    entry_id: entry_id.to_proto(),
 436                    new_path: new_project_path.path.to_proto(),
 437                    new_worktree_id: new_project_path.worktree_id.to_proto(),
 438                });
 439                cx.spawn(async move |_, cx| {
 440                    let response = response.await?;
 441                    match response.entry {
 442                        Some(entry) => new_worktree
 443                            .update(cx, |worktree, cx| {
 444                                worktree.as_remote_mut().unwrap().insert_entry(
 445                                    entry,
 446                                    response.worktree_scan_id as usize,
 447                                    cx,
 448                                )
 449                            })?
 450                            .await
 451                            .map(CreatedEntry::Included),
 452                        None => {
 453                            let abs_path = new_worktree.read_with(cx, |worktree, _| {
 454                                worktree.absolutize(&new_project_path.path)
 455                            })?;
 456                            Ok(CreatedEntry::Excluded { abs_path })
 457                        }
 458                    }
 459                })
 460            }
 461        }
 462    }
 463    pub fn create_worktree(
 464        &mut self,
 465        abs_path: impl AsRef<Path>,
 466        visible: bool,
 467        cx: &mut Context<Self>,
 468    ) -> Task<Result<Entity<Worktree>>> {
 469        let abs_path: Arc<SanitizedPath> = SanitizedPath::new_arc(&abs_path);
 470        if !self.loading_worktrees.contains_key(&abs_path) {
 471            let task = match &self.state {
 472                WorktreeStoreState::Remote {
 473                    upstream_client,
 474                    path_style,
 475                    ..
 476                } => {
 477                    if upstream_client.is_via_collab() {
 478                        Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
 479                    } else {
 480                        let abs_path = RemotePathBuf::new(abs_path.to_string(), *path_style);
 481                        self.create_remote_worktree(upstream_client.clone(), abs_path, visible, cx)
 482                    }
 483                }
 484                WorktreeStoreState::Local { fs } => {
 485                    self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
 486                }
 487            };
 488
 489            self.loading_worktrees
 490                .insert(abs_path.clone(), task.shared());
 491        }
 492        let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
 493        cx.spawn(async move |this, cx| {
 494            let result = task.await;
 495            this.update(cx, |this, _| this.loading_worktrees.remove(&abs_path))
 496                .ok();
 497            match result {
 498                Ok(worktree) => Ok(worktree),
 499                Err(err) => Err((*err).cloned()),
 500            }
 501        })
 502    }
 503
 504    fn create_remote_worktree(
 505        &mut self,
 506        client: AnyProtoClient,
 507        abs_path: RemotePathBuf,
 508        visible: bool,
 509        cx: &mut Context<Self>,
 510    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
 511        let path_style = abs_path.path_style();
 512        let mut abs_path = abs_path.to_string();
 513        // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
 514        // in which case want to strip the leading the `/`.
 515        // On the host-side, the `~` will get expanded.
 516        // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
 517        if abs_path.starts_with("/~") {
 518            abs_path = abs_path[1..].to_string();
 519        }
 520        if abs_path.is_empty() {
 521            abs_path = "~/".to_string();
 522        }
 523
 524        cx.spawn(async move |this, cx| {
 525            let this = this.upgrade().context("Dropped worktree store")?;
 526
 527            let path = RemotePathBuf::new(abs_path, path_style);
 528            let response = client
 529                .request(proto::AddWorktree {
 530                    project_id: REMOTE_SERVER_PROJECT_ID,
 531                    path: path.to_proto(),
 532                    visible,
 533                })
 534                .await?;
 535
 536            if let Some(existing_worktree) = this.read_with(cx, |this, cx| {
 537                this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
 538            })? {
 539                return Ok(existing_worktree);
 540            }
 541
 542            let root_path_buf = PathBuf::from(response.canonicalized_path.clone());
 543            let root_name = root_path_buf
 544                .file_name()
 545                .map(|n| n.to_string_lossy().into_owned())
 546                .unwrap_or(root_path_buf.to_string_lossy().into_owned());
 547
 548            let worktree = cx.update(|cx| {
 549                Worktree::remote(
 550                    REMOTE_SERVER_PROJECT_ID,
 551                    ReplicaId::REMOTE_SERVER,
 552                    proto::WorktreeMetadata {
 553                        id: response.worktree_id,
 554                        root_name,
 555                        visible,
 556                        abs_path: response.canonicalized_path,
 557                    },
 558                    client,
 559                    path_style,
 560                    cx,
 561                )
 562            })?;
 563
 564            this.update(cx, |this, cx| {
 565                this.add(&worktree, cx);
 566            })?;
 567            Ok(worktree)
 568        })
 569    }
 570
 571    fn create_local_worktree(
 572        &mut self,
 573        fs: Arc<dyn Fs>,
 574        abs_path: Arc<SanitizedPath>,
 575        visible: bool,
 576        cx: &mut Context<Self>,
 577    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
 578        let next_entry_id = self.next_entry_id.clone();
 579
 580        cx.spawn(async move |this, cx| {
 581            let worktree = Worktree::local(
 582                SanitizedPath::cast_arc(abs_path.clone()),
 583                visible,
 584                fs,
 585                next_entry_id,
 586                cx,
 587            )
 588            .await;
 589
 590            let worktree = worktree?;
 591
 592            this.update(cx, |this, cx| this.add(&worktree, cx))?;
 593
 594            if visible {
 595                cx.update(|cx| {
 596                    cx.add_recent_document(abs_path.as_path());
 597                })
 598                .log_err();
 599            }
 600
 601            Ok(worktree)
 602        })
 603    }
 604
 605    pub fn add(&mut self, worktree: &Entity<Worktree>, cx: &mut Context<Self>) {
 606        let worktree_id = worktree.read(cx).id();
 607        debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
 608
 609        let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
 610        let handle = if push_strong_handle {
 611            WorktreeHandle::Strong(worktree.clone())
 612        } else {
 613            WorktreeHandle::Weak(worktree.downgrade())
 614        };
 615        if self.worktrees_reordered {
 616            self.worktrees.push(handle);
 617        } else {
 618            let i = match self
 619                .worktrees
 620                .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
 621                    other.upgrade().map(|worktree| worktree.read(cx).abs_path())
 622                }) {
 623                Ok(i) | Err(i) => i,
 624            };
 625            self.worktrees.insert(i, handle);
 626        }
 627
 628        cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
 629        self.send_project_updates(cx);
 630
 631        let handle_id = worktree.entity_id();
 632        cx.subscribe(worktree, |_, worktree, event, cx| {
 633            let worktree_id = worktree.read(cx).id();
 634            match event {
 635                worktree::Event::UpdatedEntries(changes) => {
 636                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedEntries(
 637                        worktree_id,
 638                        changes.clone(),
 639                    ));
 640                }
 641                worktree::Event::UpdatedGitRepositories(set) => {
 642                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedGitRepositories(
 643                        worktree_id,
 644                        set.clone(),
 645                    ));
 646                }
 647                worktree::Event::DeletedEntry(id) => {
 648                    cx.emit(WorktreeStoreEvent::WorktreeDeletedEntry(worktree_id, *id))
 649                }
 650            }
 651        })
 652        .detach();
 653        cx.observe_release(worktree, move |this, worktree, cx| {
 654            cx.emit(WorktreeStoreEvent::WorktreeReleased(
 655                handle_id,
 656                worktree.id(),
 657            ));
 658            cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 659                handle_id,
 660                worktree.id(),
 661            ));
 662            this.send_project_updates(cx);
 663        })
 664        .detach();
 665    }
 666
 667    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut Context<Self>) {
 668        self.worktrees.retain(|worktree| {
 669            if let Some(worktree) = worktree.upgrade() {
 670                if worktree.read(cx).id() == id_to_remove {
 671                    cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 672                        worktree.entity_id(),
 673                        id_to_remove,
 674                    ));
 675                    false
 676                } else {
 677                    true
 678                }
 679            } else {
 680                false
 681            }
 682        });
 683        self.send_project_updates(cx);
 684    }
 685
 686    pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
 687        self.worktrees_reordered = worktrees_reordered;
 688    }
 689
 690    fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
 691        match &self.state {
 692            WorktreeStoreState::Remote {
 693                upstream_client,
 694                upstream_project_id,
 695                ..
 696            } => Some((upstream_client.clone(), *upstream_project_id)),
 697            WorktreeStoreState::Local { .. } => None,
 698        }
 699    }
 700
 701    pub fn set_worktrees_from_proto(
 702        &mut self,
 703        worktrees: Vec<proto::WorktreeMetadata>,
 704        replica_id: ReplicaId,
 705        cx: &mut Context<Self>,
 706    ) -> Result<()> {
 707        let mut old_worktrees_by_id = self
 708            .worktrees
 709            .drain(..)
 710            .filter_map(|worktree| {
 711                let worktree = worktree.upgrade()?;
 712                Some((worktree.read(cx).id(), worktree))
 713            })
 714            .collect::<HashMap<_, _>>();
 715
 716        let (client, project_id) = self.upstream_client().context("invalid project")?;
 717
 718        for worktree in worktrees {
 719            if let Some(old_worktree) =
 720                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
 721            {
 722                let push_strong_handle =
 723                    self.retain_worktrees || old_worktree.read(cx).is_visible();
 724                let handle = if push_strong_handle {
 725                    WorktreeHandle::Strong(old_worktree.clone())
 726                } else {
 727                    WorktreeHandle::Weak(old_worktree.downgrade())
 728                };
 729                self.worktrees.push(handle);
 730            } else {
 731                self.add(
 732                    &Worktree::remote(
 733                        project_id,
 734                        replica_id,
 735                        worktree,
 736                        client.clone(),
 737                        self.path_style(),
 738                        cx,
 739                    ),
 740                    cx,
 741                );
 742            }
 743        }
 744        self.send_project_updates(cx);
 745
 746        Ok(())
 747    }
 748
 749    pub fn move_worktree(
 750        &mut self,
 751        source: WorktreeId,
 752        destination: WorktreeId,
 753        cx: &mut Context<Self>,
 754    ) -> Result<()> {
 755        if source == destination {
 756            return Ok(());
 757        }
 758
 759        let mut source_index = None;
 760        let mut destination_index = None;
 761        for (i, worktree) in self.worktrees.iter().enumerate() {
 762            if let Some(worktree) = worktree.upgrade() {
 763                let worktree_id = worktree.read(cx).id();
 764                if worktree_id == source {
 765                    source_index = Some(i);
 766                    if destination_index.is_some() {
 767                        break;
 768                    }
 769                } else if worktree_id == destination {
 770                    destination_index = Some(i);
 771                    if source_index.is_some() {
 772                        break;
 773                    }
 774                }
 775            }
 776        }
 777
 778        let source_index =
 779            source_index.with_context(|| format!("Missing worktree for id {source}"))?;
 780        let destination_index =
 781            destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
 782
 783        if source_index == destination_index {
 784            return Ok(());
 785        }
 786
 787        let worktree_to_move = self.worktrees.remove(source_index);
 788        self.worktrees.insert(destination_index, worktree_to_move);
 789        self.worktrees_reordered = true;
 790        cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
 791        cx.notify();
 792        Ok(())
 793    }
 794
 795    pub fn disconnected_from_host(&mut self, cx: &mut App) {
 796        for worktree in &self.worktrees {
 797            if let Some(worktree) = worktree.upgrade() {
 798                worktree.update(cx, |worktree, _| {
 799                    if let Some(worktree) = worktree.as_remote_mut() {
 800                        worktree.disconnected_from_host();
 801                    }
 802                });
 803            }
 804        }
 805    }
 806
 807    pub fn send_project_updates(&mut self, cx: &mut Context<Self>) {
 808        let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
 809            return;
 810        };
 811
 812        let update = proto::UpdateProject {
 813            project_id,
 814            worktrees: self.worktree_metadata_protos(cx),
 815        };
 816
 817        // collab has bad concurrency guarantees, so we send requests in serial.
 818        let update_project = if downstream_client.is_via_collab() {
 819            Some(downstream_client.request(update))
 820        } else {
 821            downstream_client.send(update).log_err();
 822            None
 823        };
 824        cx.spawn(async move |this, cx| {
 825            if let Some(update_project) = update_project {
 826                update_project.await?;
 827            }
 828
 829            this.update(cx, |this, cx| {
 830                let worktrees = this.worktrees().collect::<Vec<_>>();
 831
 832                for worktree in worktrees {
 833                    worktree.update(cx, |worktree, cx| {
 834                        let client = downstream_client.clone();
 835                        worktree.observe_updates(project_id, cx, {
 836                            move |update| {
 837                                let client = client.clone();
 838                                async move {
 839                                    if client.is_via_collab() {
 840                                        client
 841                                            .request(update)
 842                                            .map(|result| result.log_err().is_some())
 843                                            .await
 844                                    } else {
 845                                        client.send(update).log_err().is_some()
 846                                    }
 847                                }
 848                            }
 849                        });
 850                    });
 851
 852                    cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
 853                }
 854
 855                anyhow::Ok(())
 856            })
 857        })
 858        .detach_and_log_err(cx);
 859    }
 860
 861    pub fn worktree_metadata_protos(&self, cx: &App) -> Vec<proto::WorktreeMetadata> {
 862        self.worktrees()
 863            .map(|worktree| {
 864                let worktree = worktree.read(cx);
 865                proto::WorktreeMetadata {
 866                    id: worktree.id().to_proto(),
 867                    root_name: worktree.root_name_str().to_owned(),
 868                    visible: worktree.is_visible(),
 869                    abs_path: worktree.abs_path().to_string_lossy().into_owned(),
 870                }
 871            })
 872            .collect()
 873    }
 874
 875    pub fn shared(
 876        &mut self,
 877        remote_id: u64,
 878        downstream_client: AnyProtoClient,
 879        cx: &mut Context<Self>,
 880    ) {
 881        self.retain_worktrees = true;
 882        self.downstream_client = Some((downstream_client, remote_id));
 883
 884        // When shared, retain all worktrees
 885        for worktree_handle in self.worktrees.iter_mut() {
 886            match worktree_handle {
 887                WorktreeHandle::Strong(_) => {}
 888                WorktreeHandle::Weak(worktree) => {
 889                    if let Some(worktree) = worktree.upgrade() {
 890                        *worktree_handle = WorktreeHandle::Strong(worktree);
 891                    }
 892                }
 893            }
 894        }
 895        self.send_project_updates(cx);
 896    }
 897
 898    pub fn unshared(&mut self, cx: &mut Context<Self>) {
 899        self.retain_worktrees = false;
 900        self.downstream_client.take();
 901
 902        // When not shared, only retain the visible worktrees
 903        for worktree_handle in self.worktrees.iter_mut() {
 904            if let WorktreeHandle::Strong(worktree) = worktree_handle {
 905                let is_visible = worktree.update(cx, |worktree, _| {
 906                    worktree.stop_observing_updates();
 907                    worktree.is_visible()
 908                });
 909                if !is_visible {
 910                    *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
 911                }
 912            }
 913        }
 914    }
 915
 916    /// search over all worktrees and return buffers that *might* match the search.
 917    pub fn find_search_candidates(
 918        &self,
 919        query: SearchQuery,
 920        limit: usize,
 921        open_entries: HashSet<ProjectEntryId>,
 922        fs: Arc<dyn Fs>,
 923        cx: &Context<Self>,
 924    ) -> Receiver<ProjectPath> {
 925        let snapshots = self
 926            .visible_worktrees(cx)
 927            .filter_map(|tree| {
 928                let tree = tree.read(cx);
 929                Some((tree.snapshot(), tree.as_local()?.settings()))
 930            })
 931            .collect::<Vec<_>>();
 932
 933        let executor = cx.background_executor().clone();
 934
 935        // We want to return entries in the order they are in the worktrees, so we have one
 936        // thread that iterates over the worktrees (and ignored directories) as necessary,
 937        // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
 938        // channel.
 939        // We spawn a number of workers that take items from the filter channel and check the query
 940        // against the version of the file on disk.
 941        let (filter_tx, filter_rx) = smol::channel::bounded(64);
 942        let (output_tx, output_rx) = smol::channel::bounded(64);
 943        let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
 944
 945        let input = cx.background_spawn({
 946            let fs = fs.clone();
 947            let query = query.clone();
 948            async move {
 949                Self::find_candidate_paths(
 950                    fs,
 951                    snapshots,
 952                    open_entries,
 953                    query,
 954                    filter_tx,
 955                    output_tx,
 956                )
 957                .await
 958                .log_err();
 959            }
 960        });
 961        const MAX_CONCURRENT_FILE_SCANS: usize = 64;
 962        let filters = cx.background_spawn(async move {
 963            let fs = &fs;
 964            let query = &query;
 965            executor
 966                .scoped(move |scope| {
 967                    for _ in 0..MAX_CONCURRENT_FILE_SCANS {
 968                        let filter_rx = filter_rx.clone();
 969                        scope.spawn(async move {
 970                            Self::filter_paths(fs, filter_rx, query)
 971                                .await
 972                                .log_with_level(log::Level::Debug);
 973                        })
 974                    }
 975                })
 976                .await;
 977        });
 978        cx.background_spawn(async move {
 979            let mut matched = 0;
 980            while let Ok(mut receiver) = output_rx.recv().await {
 981                let Some(path) = receiver.next().await else {
 982                    continue;
 983                };
 984                let Ok(_) = matching_paths_tx.send(path).await else {
 985                    break;
 986                };
 987                matched += 1;
 988                if matched == limit {
 989                    break;
 990                }
 991            }
 992            drop(input);
 993            drop(filters);
 994        })
 995        .detach();
 996        matching_paths_rx
 997    }
 998
 999    async fn find_candidate_paths(
1000        _: Arc<dyn Fs>,
1001        _: Vec<(worktree::Snapshot, WorktreeSettings)>,
1002        _: HashSet<ProjectEntryId>,
1003        _: SearchQuery,
1004        _: Sender<MatchingEntry>,
1005        _: Sender<oneshot::Receiver<ProjectPath>>,
1006    ) -> Result<()> {
1007        Ok(())
1008    }
1009
1010    async fn filter_paths(
1011        fs: &Arc<dyn Fs>,
1012        input: Receiver<MatchingEntry>,
1013        query: &SearchQuery,
1014    ) -> Result<()> {
1015        let mut input = pin!(input);
1016        while let Some(mut entry) = input.next().await {
1017            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
1018            let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
1019                continue;
1020            };
1021
1022            let mut file = BufReader::new(file);
1023            let file_start = file.fill_buf()?;
1024
1025            if let Err(Some(starting_position)) =
1026                std::str::from_utf8(file_start).map_err(|e| e.error_len())
1027            {
1028                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
1029                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
1030                log::debug!(
1031                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
1032                );
1033                continue;
1034            }
1035
1036            if query.detect(file).unwrap_or(false) {
1037                entry.respond.send(entry.path).await?
1038            }
1039        }
1040
1041        Ok(())
1042    }
1043
1044    pub async fn handle_create_project_entry(
1045        this: Entity<Self>,
1046        envelope: TypedEnvelope<proto::CreateProjectEntry>,
1047        mut cx: AsyncApp,
1048    ) -> Result<proto::ProjectEntryResponse> {
1049        let worktree = this.update(&mut cx, |this, cx| {
1050            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
1051            this.worktree_for_id(worktree_id, cx)
1052                .context("worktree not found")
1053        })??;
1054        Worktree::handle_create_entry(worktree, envelope.payload, cx).await
1055    }
1056
1057    pub async fn handle_copy_project_entry(
1058        this: Entity<Self>,
1059        envelope: TypedEnvelope<proto::CopyProjectEntry>,
1060        mut cx: AsyncApp,
1061    ) -> Result<proto::ProjectEntryResponse> {
1062        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1063        let new_worktree_id = WorktreeId::from_proto(envelope.payload.new_worktree_id);
1064        let new_project_path = (
1065            new_worktree_id,
1066            RelPath::from_proto(&envelope.payload.new_path)?,
1067        );
1068        let (scan_id, entry) = this.update(&mut cx, |this, cx| {
1069            let Some((_, project_id)) = this.downstream_client else {
1070                bail!("no downstream client")
1071            };
1072            let Some(entry) = this.entry_for_id(entry_id, cx) else {
1073                bail!("no such entry");
1074            };
1075            if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1076                bail!("entry is private")
1077            }
1078
1079            let new_worktree = this
1080                .worktree_for_id(new_worktree_id, cx)
1081                .context("no such worktree")?;
1082            let scan_id = new_worktree.read(cx).scan_id();
1083            anyhow::Ok((
1084                scan_id,
1085                this.copy_entry(entry_id, new_project_path.into(), cx),
1086            ))
1087        })??;
1088        let entry = entry.await?;
1089        Ok(proto::ProjectEntryResponse {
1090            entry: entry.as_ref().map(|entry| entry.into()),
1091            worktree_scan_id: scan_id as u64,
1092        })
1093    }
1094
1095    pub async fn handle_delete_project_entry(
1096        this: Entity<Self>,
1097        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
1098        mut cx: AsyncApp,
1099    ) -> Result<proto::ProjectEntryResponse> {
1100        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1101        let worktree = this.update(&mut cx, |this, cx| {
1102            let Some((_, project_id)) = this.downstream_client else {
1103                bail!("no downstream client")
1104            };
1105            let Some(entry) = this.entry_for_id(entry_id, cx) else {
1106                bail!("no entry")
1107            };
1108            if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1109                bail!("entry is private")
1110            }
1111            this.worktree_for_entry(entry_id, cx)
1112                .context("worktree not found")
1113        })??;
1114        Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
1115    }
1116
1117    pub async fn handle_rename_project_entry(
1118        this: Entity<Self>,
1119        request: proto::RenameProjectEntry,
1120        mut cx: AsyncApp,
1121    ) -> Result<proto::ProjectEntryResponse> {
1122        let entry_id = ProjectEntryId::from_proto(request.entry_id);
1123        let new_worktree_id = WorktreeId::from_proto(request.new_worktree_id);
1124        let rel_path = RelPath::from_proto(&request.new_path)
1125            .with_context(|| format!("received invalid relative path {:?}", &request.new_path))?;
1126
1127        let (scan_id, task) = this.update(&mut cx, |this, cx| {
1128            let worktree = this
1129                .worktree_for_entry(entry_id, cx)
1130                .context("no such worktree")?;
1131
1132            let Some((_, project_id)) = this.downstream_client else {
1133                bail!("no downstream client")
1134            };
1135            let entry = worktree
1136                .read(cx)
1137                .entry_for_id(entry_id)
1138                .ok_or_else(|| anyhow!("missing entry"))?;
1139            if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1140                bail!("entry is private")
1141            }
1142
1143            let scan_id = worktree.read(cx).scan_id();
1144            anyhow::Ok((
1145                scan_id,
1146                this.rename_entry(entry_id, (new_worktree_id, rel_path).into(), cx),
1147            ))
1148        })??;
1149        Ok(proto::ProjectEntryResponse {
1150            entry: match &task.await? {
1151                CreatedEntry::Included(entry) => Some(entry.into()),
1152                CreatedEntry::Excluded { .. } => None,
1153            },
1154            worktree_scan_id: scan_id as u64,
1155        })
1156    }
1157
1158    pub async fn handle_expand_project_entry(
1159        this: Entity<Self>,
1160        envelope: TypedEnvelope<proto::ExpandProjectEntry>,
1161        mut cx: AsyncApp,
1162    ) -> Result<proto::ExpandProjectEntryResponse> {
1163        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1164        let worktree = this
1165            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1166            .context("invalid request")?;
1167        Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
1168    }
1169
1170    pub async fn handle_expand_all_for_project_entry(
1171        this: Entity<Self>,
1172        envelope: TypedEnvelope<proto::ExpandAllForProjectEntry>,
1173        mut cx: AsyncApp,
1174    ) -> Result<proto::ExpandAllForProjectEntryResponse> {
1175        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1176        let worktree = this
1177            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1178            .context("invalid request")?;
1179        Worktree::handle_expand_all_for_entry(worktree, envelope.payload, cx).await
1180    }
1181
1182    pub fn fs(&self) -> Option<Arc<dyn Fs>> {
1183        match &self.state {
1184            WorktreeStoreState::Local { fs } => Some(fs.clone()),
1185            WorktreeStoreState::Remote { .. } => None,
1186        }
1187    }
1188}
1189
1190#[derive(Clone, Debug)]
1191enum WorktreeHandle {
1192    Strong(Entity<Worktree>),
1193    Weak(WeakEntity<Worktree>),
1194}
1195
1196impl WorktreeHandle {
1197    fn upgrade(&self) -> Option<Entity<Worktree>> {
1198        match self {
1199            WorktreeHandle::Strong(handle) => Some(handle.clone()),
1200            WorktreeHandle::Weak(handle) => handle.upgrade(),
1201        }
1202    }
1203}