worktree_store.rs

   1use std::{
   2    path::{Path, PathBuf},
   3    pin::pin,
   4    sync::{Arc, atomic::AtomicUsize},
   5};
   6
   7use anyhow::{Context as _, Result, anyhow, bail};
   8use collections::{HashMap, HashSet};
   9use fs::{Fs, copy_recursive};
  10use futures::{AsyncBufReadExt as _, FutureExt, SinkExt, future::Shared, io::BufReader};
  11use gpui::{
  12    App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity,
  13};
  14use postage::oneshot;
  15use rpc::{
  16    AnyProtoClient, ErrorExt, TypedEnvelope,
  17    proto::{self, REMOTE_SERVER_PROJECT_ID},
  18};
  19use smol::{
  20    channel::{Receiver, Sender},
  21    stream::StreamExt,
  22};
  23use text::ReplicaId;
  24use util::{
  25    ResultExt,
  26    paths::{PathStyle, RemotePathBuf, SanitizedPath},
  27    rel_path::RelPath,
  28};
  29use worktree::{
  30    CreatedEntry, Entry, ProjectEntryId, UpdatedEntriesSet, UpdatedGitRepositoriesSet, Worktree,
  31    WorktreeId, WorktreeSettings,
  32};
  33
  34use crate::{ProjectPath, search::SearchQuery};
  35
  36struct MatchingEntry {
  37    worktree_root: Arc<Path>,
  38    path: ProjectPath,
  39    respond: oneshot::Sender<ProjectPath>,
  40}
  41
  42enum WorktreeStoreState {
  43    Local {
  44        fs: Arc<dyn Fs>,
  45    },
  46    Remote {
  47        upstream_client: AnyProtoClient,
  48        upstream_project_id: u64,
  49        path_style: PathStyle,
  50    },
  51}
  52
  53pub struct WorktreeStore {
  54    next_entry_id: Arc<AtomicUsize>,
  55    downstream_client: Option<(AnyProtoClient, u64)>,
  56    retain_worktrees: bool,
  57    worktrees: Vec<WorktreeHandle>,
  58    worktrees_reordered: bool,
  59    scanning_enabled: bool,
  60    #[allow(clippy::type_complexity)]
  61    loading_worktrees:
  62        HashMap<Arc<SanitizedPath>, Shared<Task<Result<Entity<Worktree>, Arc<anyhow::Error>>>>>,
  63    state: WorktreeStoreState,
  64}
  65
  66#[derive(Debug)]
  67pub enum WorktreeStoreEvent {
  68    WorktreeAdded(Entity<Worktree>),
  69    WorktreeRemoved(EntityId, WorktreeId),
  70    WorktreeReleased(EntityId, WorktreeId),
  71    WorktreeOrderChanged,
  72    WorktreeUpdateSent(Entity<Worktree>),
  73    WorktreeUpdatedEntries(WorktreeId, UpdatedEntriesSet),
  74    WorktreeUpdatedGitRepositories(WorktreeId, UpdatedGitRepositoriesSet),
  75    WorktreeDeletedEntry(WorktreeId, ProjectEntryId),
  76}
  77
  78impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
  79
  80impl WorktreeStore {
  81    pub fn init(client: &AnyProtoClient) {
  82        client.add_entity_request_handler(Self::handle_create_project_entry);
  83        client.add_entity_request_handler(Self::handle_copy_project_entry);
  84        client.add_entity_request_handler(Self::handle_delete_project_entry);
  85        client.add_entity_request_handler(Self::handle_expand_project_entry);
  86        client.add_entity_request_handler(Self::handle_expand_all_for_project_entry);
  87    }
  88
  89    pub fn local(retain_worktrees: bool, fs: Arc<dyn Fs>) -> Self {
  90        Self {
  91            next_entry_id: Default::default(),
  92            loading_worktrees: Default::default(),
  93            downstream_client: None,
  94            worktrees: Vec::new(),
  95            worktrees_reordered: false,
  96            scanning_enabled: true,
  97            retain_worktrees,
  98            state: WorktreeStoreState::Local { fs },
  99        }
 100    }
 101
 102    pub fn remote(
 103        retain_worktrees: bool,
 104        upstream_client: AnyProtoClient,
 105        upstream_project_id: u64,
 106        path_style: PathStyle,
 107    ) -> Self {
 108        Self {
 109            next_entry_id: Default::default(),
 110            loading_worktrees: Default::default(),
 111            downstream_client: None,
 112            worktrees: Vec::new(),
 113            worktrees_reordered: false,
 114            scanning_enabled: true,
 115            retain_worktrees,
 116            state: WorktreeStoreState::Remote {
 117                upstream_client,
 118                upstream_project_id,
 119                path_style,
 120            },
 121        }
 122    }
 123
 124    pub fn disable_scanner(&mut self) {
 125        self.scanning_enabled = false;
 126    }
 127
 128    /// Iterates through all worktrees, including ones that don't appear in the project panel
 129    pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Entity<Worktree>> {
 130        self.worktrees
 131            .iter()
 132            .filter_map(move |worktree| worktree.upgrade())
 133    }
 134
 135    /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
 136    pub fn visible_worktrees<'a>(
 137        &'a self,
 138        cx: &'a App,
 139    ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
 140        self.worktrees()
 141            .filter(|worktree| worktree.read(cx).is_visible())
 142    }
 143
 144    /// Iterates through all user-visible worktrees (directories and files that appear in the project panel) and other, invisible single files that could appear e.g. due to drag and drop.
 145    pub fn visible_worktrees_and_single_files<'a>(
 146        &'a self,
 147        cx: &'a App,
 148    ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
 149        self.worktrees()
 150            .filter(|worktree| worktree.read(cx).is_visible() || worktree.read(cx).is_single_file())
 151    }
 152
 153    pub fn worktree_for_id(&self, id: WorktreeId, cx: &App) -> Option<Entity<Worktree>> {
 154        self.worktrees()
 155            .find(|worktree| worktree.read(cx).id() == id)
 156    }
 157
 158    pub fn worktree_for_entry(
 159        &self,
 160        entry_id: ProjectEntryId,
 161        cx: &App,
 162    ) -> Option<Entity<Worktree>> {
 163        self.worktrees()
 164            .find(|worktree| worktree.read(cx).contains_entry(entry_id))
 165    }
 166
 167    pub fn find_worktree(
 168        &self,
 169        abs_path: impl AsRef<Path>,
 170        cx: &App,
 171    ) -> Option<(Entity<Worktree>, Arc<RelPath>)> {
 172        let abs_path = SanitizedPath::new(abs_path.as_ref());
 173        for tree in self.worktrees() {
 174            let path_style = tree.read(cx).path_style();
 175            if let Ok(relative_path) = abs_path.as_ref().strip_prefix(tree.read(cx).abs_path())
 176                && let Ok(relative_path) = RelPath::new(relative_path, path_style)
 177            {
 178                return Some((tree.clone(), relative_path.into_arc()));
 179            }
 180        }
 181        None
 182    }
 183
 184    pub fn absolutize(&self, project_path: &ProjectPath, cx: &App) -> Option<PathBuf> {
 185        let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
 186        Some(worktree.read(cx).absolutize(&project_path.path))
 187    }
 188
 189    pub fn path_style(&self) -> PathStyle {
 190        match &self.state {
 191            WorktreeStoreState::Local { .. } => PathStyle::local(),
 192            WorktreeStoreState::Remote { path_style, .. } => *path_style,
 193        }
 194    }
 195
 196    pub fn find_or_create_worktree(
 197        &mut self,
 198        abs_path: impl AsRef<Path>,
 199        visible: bool,
 200        cx: &mut Context<Self>,
 201    ) -> Task<Result<(Entity<Worktree>, Arc<RelPath>)>> {
 202        let abs_path = abs_path.as_ref();
 203        if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
 204            Task::ready(Ok((tree, relative_path)))
 205        } else {
 206            let worktree = self.create_worktree(abs_path, visible, cx);
 207            cx.background_spawn(async move { Ok((worktree.await?, RelPath::empty().into())) })
 208        }
 209    }
 210
 211    pub fn entry_for_id<'a>(&'a self, entry_id: ProjectEntryId, cx: &'a App) -> Option<&'a Entry> {
 212        self.worktrees()
 213            .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
 214    }
 215
 216    pub fn worktree_and_entry_for_id<'a>(
 217        &'a self,
 218        entry_id: ProjectEntryId,
 219        cx: &'a App,
 220    ) -> Option<(Entity<Worktree>, &'a Entry)> {
 221        self.worktrees().find_map(|worktree| {
 222            worktree
 223                .read(cx)
 224                .entry_for_id(entry_id)
 225                .map(|e| (worktree.clone(), e))
 226        })
 227    }
 228
 229    pub fn entry_for_path<'a>(&'a self, path: &ProjectPath, cx: &'a App) -> Option<&'a Entry> {
 230        self.worktree_for_id(path.worktree_id, cx)?
 231            .read(cx)
 232            .entry_for_path(&path.path)
 233    }
 234
 235    pub fn copy_entry(
 236        &mut self,
 237        entry_id: ProjectEntryId,
 238        new_project_path: ProjectPath,
 239        cx: &mut Context<Self>,
 240    ) -> Task<Result<Option<Entry>>> {
 241        let Some(old_worktree) = self.worktree_for_entry(entry_id, cx) else {
 242            return Task::ready(Err(anyhow!("no such worktree")));
 243        };
 244        let Some(old_entry) = old_worktree.read(cx).entry_for_id(entry_id) else {
 245            return Task::ready(Err(anyhow!("no such entry")));
 246        };
 247        let Some(new_worktree) = self.worktree_for_id(new_project_path.worktree_id, cx) else {
 248            return Task::ready(Err(anyhow!("no such worktree")));
 249        };
 250
 251        match &self.state {
 252            WorktreeStoreState::Local { fs } => {
 253                let old_abs_path = old_worktree.read(cx).absolutize(&old_entry.path);
 254                let new_abs_path = new_worktree.read(cx).absolutize(&new_project_path.path);
 255                let fs = fs.clone();
 256                let copy = cx.background_spawn(async move {
 257                    copy_recursive(
 258                        fs.as_ref(),
 259                        &old_abs_path,
 260                        &new_abs_path,
 261                        Default::default(),
 262                    )
 263                    .await
 264                });
 265
 266                cx.spawn(async move |_, cx| {
 267                    copy.await?;
 268                    new_worktree
 269                        .update(cx, |this, cx| {
 270                            this.as_local_mut().unwrap().refresh_entry(
 271                                new_project_path.path,
 272                                None,
 273                                cx,
 274                            )
 275                        })?
 276                        .await
 277                })
 278            }
 279            WorktreeStoreState::Remote {
 280                upstream_client,
 281                upstream_project_id,
 282                ..
 283            } => {
 284                let response = upstream_client.request(proto::CopyProjectEntry {
 285                    project_id: *upstream_project_id,
 286                    entry_id: entry_id.to_proto(),
 287                    new_path: new_project_path.path.to_proto(),
 288                    new_worktree_id: new_project_path.worktree_id.to_proto(),
 289                });
 290                cx.spawn(async move |_, cx| {
 291                    let response = response.await?;
 292                    match response.entry {
 293                        Some(entry) => new_worktree
 294                            .update(cx, |worktree, cx| {
 295                                worktree.as_remote_mut().unwrap().insert_entry(
 296                                    entry,
 297                                    response.worktree_scan_id as usize,
 298                                    cx,
 299                                )
 300                            })?
 301                            .await
 302                            .map(Some),
 303                        None => Ok(None),
 304                    }
 305                })
 306            }
 307        }
 308    }
 309
 310    pub fn rename_entry(
 311        &mut self,
 312        entry_id: ProjectEntryId,
 313        new_project_path: ProjectPath,
 314        cx: &mut Context<Self>,
 315    ) -> Task<Result<CreatedEntry>> {
 316        let Some(old_worktree) = self.worktree_for_entry(entry_id, cx) else {
 317            return Task::ready(Err(anyhow!("no such worktree")));
 318        };
 319        let Some(old_entry) = old_worktree.read(cx).entry_for_id(entry_id).cloned() else {
 320            return Task::ready(Err(anyhow!("no such entry")));
 321        };
 322        let Some(new_worktree) = self.worktree_for_id(new_project_path.worktree_id, cx) else {
 323            return Task::ready(Err(anyhow!("no such worktree")));
 324        };
 325
 326        match &self.state {
 327            WorktreeStoreState::Local { fs } => {
 328                let abs_old_path = old_worktree.read(cx).absolutize(&old_entry.path);
 329                let new_worktree_ref = new_worktree.read(cx);
 330                let is_root_entry = new_worktree_ref
 331                    .root_entry()
 332                    .is_some_and(|e| e.id == entry_id);
 333                let abs_new_path = if is_root_entry {
 334                    let abs_path = new_worktree_ref.abs_path();
 335                    let Some(root_parent_path) = abs_path.parent() else {
 336                        return Task::ready(Err(anyhow!("no parent for path {:?}", abs_path)));
 337                    };
 338                    root_parent_path.join(new_project_path.path.as_std_path())
 339                } else {
 340                    new_worktree_ref.absolutize(&new_project_path.path)
 341                };
 342
 343                let fs = fs.clone();
 344                let case_sensitive = new_worktree
 345                    .read(cx)
 346                    .as_local()
 347                    .unwrap()
 348                    .fs_is_case_sensitive();
 349
 350                let do_rename =
 351                    async move |fs: &dyn Fs, old_path: &Path, new_path: &Path, overwrite| {
 352                        fs.rename(
 353                            &old_path,
 354                            &new_path,
 355                            fs::RenameOptions {
 356                                overwrite,
 357                                ..fs::RenameOptions::default()
 358                            },
 359                        )
 360                        .await
 361                        .with_context(|| format!("renaming {old_path:?} into {new_path:?}"))
 362                    };
 363
 364                let rename = cx.background_spawn({
 365                    let abs_new_path = abs_new_path.clone();
 366                    async move {
 367                        // If we're on a case-insensitive FS and we're doing a case-only rename (i.e. `foobar` to `FOOBAR`)
 368                        // we want to overwrite, because otherwise we run into a file-already-exists error.
 369                        let overwrite = !case_sensitive
 370                            && abs_old_path != abs_new_path
 371                            && abs_old_path.to_str().map(|p| p.to_lowercase())
 372                                == abs_new_path.to_str().map(|p| p.to_lowercase());
 373
 374                        // The directory we're renaming into might not exist yet
 375                        if let Err(e) =
 376                            do_rename(fs.as_ref(), &abs_old_path, &abs_new_path, overwrite).await
 377                        {
 378                            if let Some(err) = e.downcast_ref::<std::io::Error>()
 379                                && err.kind() == std::io::ErrorKind::NotFound
 380                            {
 381                                if let Some(parent) = abs_new_path.parent() {
 382                                    fs.create_dir(parent).await.with_context(|| {
 383                                        format!("creating parent directory {parent:?}")
 384                                    })?;
 385                                    return do_rename(
 386                                        fs.as_ref(),
 387                                        &abs_old_path,
 388                                        &abs_new_path,
 389                                        overwrite,
 390                                    )
 391                                    .await;
 392                                }
 393                            }
 394                            return Err(e);
 395                        }
 396                        Ok(())
 397                    }
 398                });
 399
 400                cx.spawn(async move |_, cx| {
 401                    rename.await?;
 402                    Ok(new_worktree
 403                        .update(cx, |this, cx| {
 404                            let local = this.as_local_mut().unwrap();
 405                            if is_root_entry {
 406                                // We eagerly update `abs_path` and refresh this worktree.
 407                                // Otherwise, the FS watcher would do it on the `RootUpdated` event,
 408                                // but with a noticeable delay, so we handle it proactively.
 409                                local.update_abs_path_and_refresh(
 410                                    SanitizedPath::new_arc(&abs_new_path),
 411                                    cx,
 412                                );
 413                                Task::ready(Ok(this.root_entry().cloned()))
 414                            } else {
 415                                // First refresh the parent directory (in case it was newly created)
 416                                if let Some(parent) = new_project_path.path.parent() {
 417                                    let _ = local.refresh_entries_for_paths(vec![parent.into()]);
 418                                }
 419                                // Then refresh the new path
 420                                local.refresh_entry(
 421                                    new_project_path.path.clone(),
 422                                    Some(old_entry.path),
 423                                    cx,
 424                                )
 425                            }
 426                        })?
 427                        .await?
 428                        .map(CreatedEntry::Included)
 429                        .unwrap_or_else(|| CreatedEntry::Excluded {
 430                            abs_path: abs_new_path,
 431                        }))
 432                })
 433            }
 434            WorktreeStoreState::Remote {
 435                upstream_client,
 436                upstream_project_id,
 437                ..
 438            } => {
 439                let response = upstream_client.request(proto::RenameProjectEntry {
 440                    project_id: *upstream_project_id,
 441                    entry_id: entry_id.to_proto(),
 442                    new_path: new_project_path.path.to_proto(),
 443                    new_worktree_id: new_project_path.worktree_id.to_proto(),
 444                });
 445                cx.spawn(async move |_, cx| {
 446                    let response = response.await?;
 447                    match response.entry {
 448                        Some(entry) => new_worktree
 449                            .update(cx, |worktree, cx| {
 450                                worktree.as_remote_mut().unwrap().insert_entry(
 451                                    entry,
 452                                    response.worktree_scan_id as usize,
 453                                    cx,
 454                                )
 455                            })?
 456                            .await
 457                            .map(CreatedEntry::Included),
 458                        None => {
 459                            let abs_path = new_worktree.read_with(cx, |worktree, _| {
 460                                worktree.absolutize(&new_project_path.path)
 461                            })?;
 462                            Ok(CreatedEntry::Excluded { abs_path })
 463                        }
 464                    }
 465                })
 466            }
 467        }
 468    }
 469    pub fn create_worktree(
 470        &mut self,
 471        abs_path: impl AsRef<Path>,
 472        visible: bool,
 473        cx: &mut Context<Self>,
 474    ) -> Task<Result<Entity<Worktree>>> {
 475        let abs_path: Arc<SanitizedPath> = SanitizedPath::new_arc(&abs_path);
 476        if !self.loading_worktrees.contains_key(&abs_path) {
 477            let task = match &self.state {
 478                WorktreeStoreState::Remote {
 479                    upstream_client,
 480                    path_style,
 481                    ..
 482                } => {
 483                    if upstream_client.is_via_collab() {
 484                        Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
 485                    } else {
 486                        let abs_path = RemotePathBuf::new(abs_path.to_string(), *path_style);
 487                        self.create_remote_worktree(upstream_client.clone(), abs_path, visible, cx)
 488                    }
 489                }
 490                WorktreeStoreState::Local { fs } => {
 491                    self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
 492                }
 493            };
 494
 495            self.loading_worktrees
 496                .insert(abs_path.clone(), task.shared());
 497        }
 498        let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
 499        cx.spawn(async move |this, cx| {
 500            let result = task.await;
 501            this.update(cx, |this, _| this.loading_worktrees.remove(&abs_path))
 502                .ok();
 503            match result {
 504                Ok(worktree) => Ok(worktree),
 505                Err(err) => Err((*err).cloned()),
 506            }
 507        })
 508    }
 509
 510    fn create_remote_worktree(
 511        &mut self,
 512        client: AnyProtoClient,
 513        abs_path: RemotePathBuf,
 514        visible: bool,
 515        cx: &mut Context<Self>,
 516    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
 517        let path_style = abs_path.path_style();
 518        let mut abs_path = abs_path.to_string();
 519        // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
 520        // in which case want to strip the leading the `/`.
 521        // On the host-side, the `~` will get expanded.
 522        // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
 523        if abs_path.starts_with("/~") {
 524            abs_path = abs_path[1..].to_string();
 525        }
 526        if abs_path.is_empty() {
 527            abs_path = "~/".to_string();
 528        }
 529
 530        cx.spawn(async move |this, cx| {
 531            let this = this.upgrade().context("Dropped worktree store")?;
 532
 533            let path = RemotePathBuf::new(abs_path, path_style);
 534            let response = client
 535                .request(proto::AddWorktree {
 536                    project_id: REMOTE_SERVER_PROJECT_ID,
 537                    path: path.to_proto(),
 538                    visible,
 539                })
 540                .await?;
 541
 542            if let Some(existing_worktree) = this.read_with(cx, |this, cx| {
 543                this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
 544            })? {
 545                return Ok(existing_worktree);
 546            }
 547
 548            let root_path_buf = PathBuf::from(response.canonicalized_path.clone());
 549            let root_name = root_path_buf
 550                .file_name()
 551                .map(|n| n.to_string_lossy().into_owned())
 552                .unwrap_or(root_path_buf.to_string_lossy().into_owned());
 553
 554            let worktree = cx.update(|cx| {
 555                Worktree::remote(
 556                    REMOTE_SERVER_PROJECT_ID,
 557                    ReplicaId::REMOTE_SERVER,
 558                    proto::WorktreeMetadata {
 559                        id: response.worktree_id,
 560                        root_name,
 561                        visible,
 562                        abs_path: response.canonicalized_path,
 563                    },
 564                    client,
 565                    path_style,
 566                    cx,
 567                )
 568            })?;
 569
 570            this.update(cx, |this, cx| {
 571                this.add(&worktree, cx);
 572            })?;
 573            Ok(worktree)
 574        })
 575    }
 576
 577    fn create_local_worktree(
 578        &mut self,
 579        fs: Arc<dyn Fs>,
 580        abs_path: Arc<SanitizedPath>,
 581        visible: bool,
 582        cx: &mut Context<Self>,
 583    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
 584        let next_entry_id = self.next_entry_id.clone();
 585        let scanning_enabled = self.scanning_enabled;
 586
 587        cx.spawn(async move |this, cx| {
 588            let worktree = Worktree::local(
 589                SanitizedPath::cast_arc(abs_path.clone()),
 590                visible,
 591                fs,
 592                next_entry_id,
 593                scanning_enabled,
 594                cx,
 595            )
 596            .await;
 597
 598            let worktree = worktree?;
 599
 600            this.update(cx, |this, cx| this.add(&worktree, cx))?;
 601
 602            if visible {
 603                cx.update(|cx| {
 604                    cx.add_recent_document(abs_path.as_path());
 605                })
 606                .log_err();
 607            }
 608
 609            Ok(worktree)
 610        })
 611    }
 612
 613    pub fn add(&mut self, worktree: &Entity<Worktree>, cx: &mut Context<Self>) {
 614        let worktree_id = worktree.read(cx).id();
 615        debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
 616
 617        let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
 618        let handle = if push_strong_handle {
 619            WorktreeHandle::Strong(worktree.clone())
 620        } else {
 621            WorktreeHandle::Weak(worktree.downgrade())
 622        };
 623        if self.worktrees_reordered {
 624            self.worktrees.push(handle);
 625        } else {
 626            let i = match self
 627                .worktrees
 628                .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
 629                    other.upgrade().map(|worktree| worktree.read(cx).abs_path())
 630                }) {
 631                Ok(i) | Err(i) => i,
 632            };
 633            self.worktrees.insert(i, handle);
 634        }
 635
 636        cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
 637        self.send_project_updates(cx);
 638
 639        let handle_id = worktree.entity_id();
 640        cx.subscribe(worktree, |_, worktree, event, cx| {
 641            let worktree_id = worktree.read(cx).id();
 642            match event {
 643                worktree::Event::UpdatedEntries(changes) => {
 644                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedEntries(
 645                        worktree_id,
 646                        changes.clone(),
 647                    ));
 648                }
 649                worktree::Event::UpdatedGitRepositories(set) => {
 650                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedGitRepositories(
 651                        worktree_id,
 652                        set.clone(),
 653                    ));
 654                }
 655                worktree::Event::DeletedEntry(id) => {
 656                    cx.emit(WorktreeStoreEvent::WorktreeDeletedEntry(worktree_id, *id))
 657                }
 658            }
 659        })
 660        .detach();
 661        cx.observe_release(worktree, move |this, worktree, cx| {
 662            cx.emit(WorktreeStoreEvent::WorktreeReleased(
 663                handle_id,
 664                worktree.id(),
 665            ));
 666            cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 667                handle_id,
 668                worktree.id(),
 669            ));
 670            this.send_project_updates(cx);
 671        })
 672        .detach();
 673    }
 674
 675    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut Context<Self>) {
 676        self.worktrees.retain(|worktree| {
 677            if let Some(worktree) = worktree.upgrade() {
 678                if worktree.read(cx).id() == id_to_remove {
 679                    cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 680                        worktree.entity_id(),
 681                        id_to_remove,
 682                    ));
 683                    false
 684                } else {
 685                    true
 686                }
 687            } else {
 688                false
 689            }
 690        });
 691        self.send_project_updates(cx);
 692    }
 693
 694    pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
 695        self.worktrees_reordered = worktrees_reordered;
 696    }
 697
 698    fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
 699        match &self.state {
 700            WorktreeStoreState::Remote {
 701                upstream_client,
 702                upstream_project_id,
 703                ..
 704            } => Some((upstream_client.clone(), *upstream_project_id)),
 705            WorktreeStoreState::Local { .. } => None,
 706        }
 707    }
 708
 709    pub fn set_worktrees_from_proto(
 710        &mut self,
 711        worktrees: Vec<proto::WorktreeMetadata>,
 712        replica_id: ReplicaId,
 713        cx: &mut Context<Self>,
 714    ) -> Result<()> {
 715        let mut old_worktrees_by_id = self
 716            .worktrees
 717            .drain(..)
 718            .filter_map(|worktree| {
 719                let worktree = worktree.upgrade()?;
 720                Some((worktree.read(cx).id(), worktree))
 721            })
 722            .collect::<HashMap<_, _>>();
 723
 724        let (client, project_id) = self.upstream_client().context("invalid project")?;
 725
 726        for worktree in worktrees {
 727            if let Some(old_worktree) =
 728                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
 729            {
 730                let push_strong_handle =
 731                    self.retain_worktrees || old_worktree.read(cx).is_visible();
 732                let handle = if push_strong_handle {
 733                    WorktreeHandle::Strong(old_worktree.clone())
 734                } else {
 735                    WorktreeHandle::Weak(old_worktree.downgrade())
 736                };
 737                self.worktrees.push(handle);
 738            } else {
 739                self.add(
 740                    &Worktree::remote(
 741                        project_id,
 742                        replica_id,
 743                        worktree,
 744                        client.clone(),
 745                        self.path_style(),
 746                        cx,
 747                    ),
 748                    cx,
 749                );
 750            }
 751        }
 752        self.send_project_updates(cx);
 753
 754        Ok(())
 755    }
 756
 757    pub fn move_worktree(
 758        &mut self,
 759        source: WorktreeId,
 760        destination: WorktreeId,
 761        cx: &mut Context<Self>,
 762    ) -> Result<()> {
 763        if source == destination {
 764            return Ok(());
 765        }
 766
 767        let mut source_index = None;
 768        let mut destination_index = None;
 769        for (i, worktree) in self.worktrees.iter().enumerate() {
 770            if let Some(worktree) = worktree.upgrade() {
 771                let worktree_id = worktree.read(cx).id();
 772                if worktree_id == source {
 773                    source_index = Some(i);
 774                    if destination_index.is_some() {
 775                        break;
 776                    }
 777                } else if worktree_id == destination {
 778                    destination_index = Some(i);
 779                    if source_index.is_some() {
 780                        break;
 781                    }
 782                }
 783            }
 784        }
 785
 786        let source_index =
 787            source_index.with_context(|| format!("Missing worktree for id {source}"))?;
 788        let destination_index =
 789            destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
 790
 791        if source_index == destination_index {
 792            return Ok(());
 793        }
 794
 795        let worktree_to_move = self.worktrees.remove(source_index);
 796        self.worktrees.insert(destination_index, worktree_to_move);
 797        self.worktrees_reordered = true;
 798        cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
 799        cx.notify();
 800        Ok(())
 801    }
 802
 803    pub fn disconnected_from_host(&mut self, cx: &mut App) {
 804        for worktree in &self.worktrees {
 805            if let Some(worktree) = worktree.upgrade() {
 806                worktree.update(cx, |worktree, _| {
 807                    if let Some(worktree) = worktree.as_remote_mut() {
 808                        worktree.disconnected_from_host();
 809                    }
 810                });
 811            }
 812        }
 813    }
 814
 815    pub fn send_project_updates(&mut self, cx: &mut Context<Self>) {
 816        let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
 817            return;
 818        };
 819
 820        let update = proto::UpdateProject {
 821            project_id,
 822            worktrees: self.worktree_metadata_protos(cx),
 823        };
 824
 825        // collab has bad concurrency guarantees, so we send requests in serial.
 826        let update_project = if downstream_client.is_via_collab() {
 827            Some(downstream_client.request(update))
 828        } else {
 829            downstream_client.send(update).log_err();
 830            None
 831        };
 832        cx.spawn(async move |this, cx| {
 833            if let Some(update_project) = update_project {
 834                update_project.await?;
 835            }
 836
 837            this.update(cx, |this, cx| {
 838                let worktrees = this.worktrees().collect::<Vec<_>>();
 839
 840                for worktree in worktrees {
 841                    worktree.update(cx, |worktree, cx| {
 842                        let client = downstream_client.clone();
 843                        worktree.observe_updates(project_id, cx, {
 844                            move |update| {
 845                                let client = client.clone();
 846                                async move {
 847                                    if client.is_via_collab() {
 848                                        client
 849                                            .request(update)
 850                                            .map(|result| result.log_err().is_some())
 851                                            .await
 852                                    } else {
 853                                        client.send(update).log_err().is_some()
 854                                    }
 855                                }
 856                            }
 857                        });
 858                    });
 859
 860                    cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
 861                }
 862
 863                anyhow::Ok(())
 864            })
 865        })
 866        .detach_and_log_err(cx);
 867    }
 868
 869    pub fn worktree_metadata_protos(&self, cx: &App) -> Vec<proto::WorktreeMetadata> {
 870        self.worktrees()
 871            .map(|worktree| {
 872                let worktree = worktree.read(cx);
 873                proto::WorktreeMetadata {
 874                    id: worktree.id().to_proto(),
 875                    root_name: worktree.root_name_str().to_owned(),
 876                    visible: worktree.is_visible(),
 877                    abs_path: worktree.abs_path().to_string_lossy().into_owned(),
 878                }
 879            })
 880            .collect()
 881    }
 882
 883    pub fn shared(
 884        &mut self,
 885        remote_id: u64,
 886        downstream_client: AnyProtoClient,
 887        cx: &mut Context<Self>,
 888    ) {
 889        self.retain_worktrees = true;
 890        self.downstream_client = Some((downstream_client, remote_id));
 891
 892        // When shared, retain all worktrees
 893        for worktree_handle in self.worktrees.iter_mut() {
 894            match worktree_handle {
 895                WorktreeHandle::Strong(_) => {}
 896                WorktreeHandle::Weak(worktree) => {
 897                    if let Some(worktree) = worktree.upgrade() {
 898                        *worktree_handle = WorktreeHandle::Strong(worktree);
 899                    }
 900                }
 901            }
 902        }
 903        self.send_project_updates(cx);
 904    }
 905
 906    pub fn unshared(&mut self, cx: &mut Context<Self>) {
 907        self.retain_worktrees = false;
 908        self.downstream_client.take();
 909
 910        // When not shared, only retain the visible worktrees
 911        for worktree_handle in self.worktrees.iter_mut() {
 912            if let WorktreeHandle::Strong(worktree) = worktree_handle {
 913                let is_visible = worktree.update(cx, |worktree, _| {
 914                    worktree.stop_observing_updates();
 915                    worktree.is_visible()
 916                });
 917                if !is_visible {
 918                    *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
 919                }
 920            }
 921        }
 922    }
 923
 924    /// search over all worktrees and return buffers that *might* match the search.
 925    pub fn find_search_candidates(
 926        &self,
 927        query: SearchQuery,
 928        limit: usize,
 929        open_entries: HashSet<ProjectEntryId>,
 930        fs: Arc<dyn Fs>,
 931        cx: &Context<Self>,
 932    ) -> Receiver<ProjectPath> {
 933        let snapshots = self
 934            .visible_worktrees(cx)
 935            .filter_map(|tree| {
 936                let tree = tree.read(cx);
 937                Some((tree.snapshot(), tree.as_local()?.settings()))
 938            })
 939            .collect::<Vec<_>>();
 940
 941        let executor = cx.background_executor().clone();
 942
 943        // We want to return entries in the order they are in the worktrees, so we have one
 944        // thread that iterates over the worktrees (and ignored directories) as necessary,
 945        // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
 946        // channel.
 947        // We spawn a number of workers that take items from the filter channel and check the query
 948        // against the version of the file on disk.
 949        let (filter_tx, filter_rx) = smol::channel::bounded(64);
 950        let (output_tx, output_rx) = smol::channel::bounded(64);
 951        let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
 952
 953        let input = cx.background_spawn({
 954            let fs = fs.clone();
 955            let query = query.clone();
 956            async move {
 957                Self::find_candidate_paths(
 958                    fs,
 959                    snapshots,
 960                    open_entries,
 961                    query,
 962                    filter_tx,
 963                    output_tx,
 964                )
 965                .await
 966                .log_err();
 967            }
 968        });
 969        const MAX_CONCURRENT_FILE_SCANS: usize = 64;
 970        let filters = cx.background_spawn(async move {
 971            let fs = &fs;
 972            let query = &query;
 973            executor
 974                .scoped(move |scope| {
 975                    for _ in 0..MAX_CONCURRENT_FILE_SCANS {
 976                        let filter_rx = filter_rx.clone();
 977                        scope.spawn(async move {
 978                            Self::filter_paths(fs, filter_rx, query)
 979                                .await
 980                                .log_with_level(log::Level::Debug);
 981                        })
 982                    }
 983                })
 984                .await;
 985        });
 986        cx.background_spawn(async move {
 987            let mut matched = 0;
 988            while let Ok(mut receiver) = output_rx.recv().await {
 989                let Some(path) = receiver.next().await else {
 990                    continue;
 991                };
 992                let Ok(_) = matching_paths_tx.send(path).await else {
 993                    break;
 994                };
 995                matched += 1;
 996                if matched == limit {
 997                    break;
 998                }
 999            }
1000            drop(input);
1001            drop(filters);
1002        })
1003        .detach();
1004        matching_paths_rx
1005    }
1006
1007    async fn find_candidate_paths(
1008        _: Arc<dyn Fs>,
1009        _: Vec<(worktree::Snapshot, WorktreeSettings)>,
1010        _: HashSet<ProjectEntryId>,
1011        _: SearchQuery,
1012        _: Sender<MatchingEntry>,
1013        _: Sender<oneshot::Receiver<ProjectPath>>,
1014    ) -> Result<()> {
1015        Ok(())
1016    }
1017
1018    async fn filter_paths(
1019        fs: &Arc<dyn Fs>,
1020        input: Receiver<MatchingEntry>,
1021        query: &SearchQuery,
1022    ) -> Result<()> {
1023        let mut input = pin!(input);
1024        while let Some(mut entry) = input.next().await {
1025            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
1026            let Some(file) = fs.open_read(&abs_path).await.log_err() else {
1027                continue;
1028            };
1029
1030            let mut file = BufReader::new(file);
1031            let file_start = file.fill_buf().await?;
1032
1033            if let Err(Some(starting_position)) =
1034                std::str::from_utf8(&file_start).map_err(|e| e.error_len())
1035            {
1036                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
1037                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
1038                log::debug!(
1039                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
1040                );
1041                continue;
1042            }
1043
1044            if query.detect(file).await.unwrap_or(false) {
1045                entry.respond.send(entry.path).await?
1046            }
1047        }
1048
1049        Ok(())
1050    }
1051
1052    pub async fn handle_create_project_entry(
1053        this: Entity<Self>,
1054        envelope: TypedEnvelope<proto::CreateProjectEntry>,
1055        mut cx: AsyncApp,
1056    ) -> Result<proto::ProjectEntryResponse> {
1057        let worktree = this.update(&mut cx, |this, cx| {
1058            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
1059            this.worktree_for_id(worktree_id, cx)
1060                .context("worktree not found")
1061        })??;
1062        Worktree::handle_create_entry(worktree, envelope.payload, cx).await
1063    }
1064
1065    pub async fn handle_copy_project_entry(
1066        this: Entity<Self>,
1067        envelope: TypedEnvelope<proto::CopyProjectEntry>,
1068        mut cx: AsyncApp,
1069    ) -> Result<proto::ProjectEntryResponse> {
1070        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1071        let new_worktree_id = WorktreeId::from_proto(envelope.payload.new_worktree_id);
1072        let new_project_path = (
1073            new_worktree_id,
1074            RelPath::from_proto(&envelope.payload.new_path)?,
1075        );
1076        let (scan_id, entry) = this.update(&mut cx, |this, cx| {
1077            let Some((_, project_id)) = this.downstream_client else {
1078                bail!("no downstream client")
1079            };
1080            let Some(entry) = this.entry_for_id(entry_id, cx) else {
1081                bail!("no such entry");
1082            };
1083            if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1084                bail!("entry is private")
1085            }
1086
1087            let new_worktree = this
1088                .worktree_for_id(new_worktree_id, cx)
1089                .context("no such worktree")?;
1090            let scan_id = new_worktree.read(cx).scan_id();
1091            anyhow::Ok((
1092                scan_id,
1093                this.copy_entry(entry_id, new_project_path.into(), cx),
1094            ))
1095        })??;
1096        let entry = entry.await?;
1097        Ok(proto::ProjectEntryResponse {
1098            entry: entry.as_ref().map(|entry| entry.into()),
1099            worktree_scan_id: scan_id as u64,
1100        })
1101    }
1102
1103    pub async fn handle_delete_project_entry(
1104        this: Entity<Self>,
1105        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
1106        mut cx: AsyncApp,
1107    ) -> Result<proto::ProjectEntryResponse> {
1108        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1109        let worktree = this.update(&mut cx, |this, cx| {
1110            let Some((_, project_id)) = this.downstream_client else {
1111                bail!("no downstream client")
1112            };
1113            let Some(entry) = this.entry_for_id(entry_id, cx) else {
1114                bail!("no entry")
1115            };
1116            if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1117                bail!("entry is private")
1118            }
1119            this.worktree_for_entry(entry_id, cx)
1120                .context("worktree not found")
1121        })??;
1122        Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
1123    }
1124
1125    pub async fn handle_rename_project_entry(
1126        this: Entity<Self>,
1127        request: proto::RenameProjectEntry,
1128        mut cx: AsyncApp,
1129    ) -> Result<proto::ProjectEntryResponse> {
1130        let entry_id = ProjectEntryId::from_proto(request.entry_id);
1131        let new_worktree_id = WorktreeId::from_proto(request.new_worktree_id);
1132        let rel_path = RelPath::from_proto(&request.new_path)
1133            .with_context(|| format!("received invalid relative path {:?}", &request.new_path))?;
1134
1135        let (scan_id, task) = this.update(&mut cx, |this, cx| {
1136            let worktree = this
1137                .worktree_for_entry(entry_id, cx)
1138                .context("no such worktree")?;
1139
1140            let Some((_, project_id)) = this.downstream_client else {
1141                bail!("no downstream client")
1142            };
1143            let entry = worktree
1144                .read(cx)
1145                .entry_for_id(entry_id)
1146                .ok_or_else(|| anyhow!("missing entry"))?;
1147            if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1148                bail!("entry is private")
1149            }
1150
1151            let scan_id = worktree.read(cx).scan_id();
1152            anyhow::Ok((
1153                scan_id,
1154                this.rename_entry(entry_id, (new_worktree_id, rel_path).into(), cx),
1155            ))
1156        })??;
1157        Ok(proto::ProjectEntryResponse {
1158            entry: match &task.await? {
1159                CreatedEntry::Included(entry) => Some(entry.into()),
1160                CreatedEntry::Excluded { .. } => None,
1161            },
1162            worktree_scan_id: scan_id as u64,
1163        })
1164    }
1165
1166    pub async fn handle_expand_project_entry(
1167        this: Entity<Self>,
1168        envelope: TypedEnvelope<proto::ExpandProjectEntry>,
1169        mut cx: AsyncApp,
1170    ) -> Result<proto::ExpandProjectEntryResponse> {
1171        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1172        let worktree = this
1173            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1174            .context("invalid request")?;
1175        Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
1176    }
1177
1178    pub async fn handle_expand_all_for_project_entry(
1179        this: Entity<Self>,
1180        envelope: TypedEnvelope<proto::ExpandAllForProjectEntry>,
1181        mut cx: AsyncApp,
1182    ) -> Result<proto::ExpandAllForProjectEntryResponse> {
1183        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1184        let worktree = this
1185            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1186            .context("invalid request")?;
1187        Worktree::handle_expand_all_for_entry(worktree, envelope.payload, cx).await
1188    }
1189
1190    pub fn fs(&self) -> Option<Arc<dyn Fs>> {
1191        match &self.state {
1192            WorktreeStoreState::Local { fs } => Some(fs.clone()),
1193            WorktreeStoreState::Remote { .. } => None,
1194        }
1195    }
1196}
1197
1198#[derive(Clone, Debug)]
1199enum WorktreeHandle {
1200    Strong(Entity<Worktree>),
1201    Weak(WeakEntity<Worktree>),
1202}
1203
1204impl WorktreeHandle {
1205    fn upgrade(&self) -> Option<Entity<Worktree>> {
1206        match self {
1207            WorktreeHandle::Strong(handle) => Some(handle.clone()),
1208            WorktreeHandle::Weak(handle) => handle.upgrade(),
1209        }
1210    }
1211}