worktree_store.rs

   1use std::{
   2    io::{BufRead, BufReader},
   3    path::{Path, PathBuf},
   4    pin::pin,
   5    sync::{Arc, atomic::AtomicUsize},
   6};
   7
   8use anyhow::{Context as _, Result, anyhow, bail};
   9use collections::{HashMap, HashSet};
  10use fs::{Fs, copy_recursive};
  11use futures::{FutureExt, SinkExt, future::Shared};
  12use gpui::{
  13    App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity,
  14};
  15use postage::oneshot;
  16use rpc::{
  17    AnyProtoClient, ErrorExt, TypedEnvelope,
  18    proto::{self, REMOTE_SERVER_PROJECT_ID},
  19};
  20use smol::{
  21    channel::{Receiver, Sender},
  22    stream::StreamExt,
  23};
  24use text::ReplicaId;
  25use util::{
  26    ResultExt,
  27    paths::{PathStyle, RemotePathBuf, SanitizedPath},
  28    rel_path::RelPath,
  29};
  30use worktree::{
  31    CreatedEntry, Entry, ProjectEntryId, UpdatedEntriesSet, UpdatedGitRepositoriesSet, Worktree,
  32    WorktreeId, WorktreeSettings,
  33};
  34
  35use crate::{ProjectPath, search::SearchQuery};
  36
  37struct MatchingEntry {
  38    worktree_root: Arc<Path>,
  39    path: ProjectPath,
  40    respond: oneshot::Sender<ProjectPath>,
  41}
  42
  43enum WorktreeStoreState {
  44    Local {
  45        fs: Arc<dyn Fs>,
  46    },
  47    Remote {
  48        upstream_client: AnyProtoClient,
  49        upstream_project_id: u64,
  50        path_style: PathStyle,
  51    },
  52}
  53
  54pub struct WorktreeStore {
  55    next_entry_id: Arc<AtomicUsize>,
  56    downstream_client: Option<(AnyProtoClient, u64)>,
  57    retain_worktrees: bool,
  58    worktrees: Vec<WorktreeHandle>,
  59    worktrees_reordered: bool,
  60    scanning_enabled: bool,
  61    #[allow(clippy::type_complexity)]
  62    loading_worktrees:
  63        HashMap<Arc<SanitizedPath>, Shared<Task<Result<Entity<Worktree>, Arc<anyhow::Error>>>>>,
  64    state: WorktreeStoreState,
  65}
  66
  67#[derive(Debug)]
  68pub enum WorktreeStoreEvent {
  69    WorktreeAdded(Entity<Worktree>),
  70    WorktreeRemoved(EntityId, WorktreeId),
  71    WorktreeReleased(EntityId, WorktreeId),
  72    WorktreeOrderChanged,
  73    WorktreeUpdateSent(Entity<Worktree>),
  74    WorktreeUpdatedEntries(WorktreeId, UpdatedEntriesSet),
  75    WorktreeUpdatedGitRepositories(WorktreeId, UpdatedGitRepositoriesSet),
  76    WorktreeDeletedEntry(WorktreeId, ProjectEntryId),
  77}
  78
  79impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
  80
  81impl WorktreeStore {
  82    pub fn init(client: &AnyProtoClient) {
  83        client.add_entity_request_handler(Self::handle_create_project_entry);
  84        client.add_entity_request_handler(Self::handle_copy_project_entry);
  85        client.add_entity_request_handler(Self::handle_delete_project_entry);
  86        client.add_entity_request_handler(Self::handle_expand_project_entry);
  87        client.add_entity_request_handler(Self::handle_expand_all_for_project_entry);
  88    }
  89
  90    pub fn local(retain_worktrees: bool, fs: Arc<dyn Fs>) -> Self {
  91        Self {
  92            next_entry_id: Default::default(),
  93            loading_worktrees: Default::default(),
  94            downstream_client: None,
  95            worktrees: Vec::new(),
  96            worktrees_reordered: false,
  97            scanning_enabled: true,
  98            retain_worktrees,
  99            state: WorktreeStoreState::Local { fs },
 100        }
 101    }
 102
 103    pub fn remote(
 104        retain_worktrees: bool,
 105        upstream_client: AnyProtoClient,
 106        upstream_project_id: u64,
 107        path_style: PathStyle,
 108    ) -> Self {
 109        Self {
 110            next_entry_id: Default::default(),
 111            loading_worktrees: Default::default(),
 112            downstream_client: None,
 113            worktrees: Vec::new(),
 114            worktrees_reordered: false,
 115            scanning_enabled: true,
 116            retain_worktrees,
 117            state: WorktreeStoreState::Remote {
 118                upstream_client,
 119                upstream_project_id,
 120                path_style,
 121            },
 122        }
 123    }
 124
 125    pub fn disable_scanner(&mut self) {
 126        self.scanning_enabled = false;
 127    }
 128
 129    /// Iterates through all worktrees, including ones that don't appear in the project panel
 130    pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Entity<Worktree>> {
 131        self.worktrees
 132            .iter()
 133            .filter_map(move |worktree| worktree.upgrade())
 134    }
 135
 136    /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
 137    pub fn visible_worktrees<'a>(
 138        &'a self,
 139        cx: &'a App,
 140    ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
 141        self.worktrees()
 142            .filter(|worktree| worktree.read(cx).is_visible())
 143    }
 144
 145    /// Iterates through all user-visible worktrees (directories and files that appear in the project panel) and other, invisible single files that could appear e.g. due to drag and drop.
 146    pub fn visible_worktrees_and_single_files<'a>(
 147        &'a self,
 148        cx: &'a App,
 149    ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
 150        self.worktrees()
 151            .filter(|worktree| worktree.read(cx).is_visible() || worktree.read(cx).is_single_file())
 152    }
 153
 154    pub fn worktree_for_id(&self, id: WorktreeId, cx: &App) -> Option<Entity<Worktree>> {
 155        self.worktrees()
 156            .find(|worktree| worktree.read(cx).id() == id)
 157    }
 158
 159    pub fn worktree_for_entry(
 160        &self,
 161        entry_id: ProjectEntryId,
 162        cx: &App,
 163    ) -> Option<Entity<Worktree>> {
 164        self.worktrees()
 165            .find(|worktree| worktree.read(cx).contains_entry(entry_id))
 166    }
 167
 168    pub fn find_worktree(
 169        &self,
 170        abs_path: impl AsRef<Path>,
 171        cx: &App,
 172    ) -> Option<(Entity<Worktree>, Arc<RelPath>)> {
 173        let abs_path = SanitizedPath::new(abs_path.as_ref());
 174        for tree in self.worktrees() {
 175            let path_style = tree.read(cx).path_style();
 176            if let Ok(relative_path) = abs_path.as_ref().strip_prefix(tree.read(cx).abs_path())
 177                && let Ok(relative_path) = RelPath::new(relative_path, path_style)
 178            {
 179                return Some((tree.clone(), relative_path.into_arc()));
 180            }
 181        }
 182        None
 183    }
 184
 185    pub fn project_path_for_absolute_path(&self, abs_path: &Path, cx: &App) -> Option<ProjectPath> {
 186        self.find_worktree(abs_path, cx)
 187            .map(|(worktree, relative_path)| ProjectPath {
 188                worktree_id: worktree.read(cx).id(),
 189                path: relative_path,
 190            })
 191    }
 192
 193    pub fn absolutize(&self, project_path: &ProjectPath, cx: &App) -> Option<PathBuf> {
 194        let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
 195        Some(worktree.read(cx).absolutize(&project_path.path))
 196    }
 197
 198    pub fn path_style(&self) -> PathStyle {
 199        match &self.state {
 200            WorktreeStoreState::Local { .. } => PathStyle::local(),
 201            WorktreeStoreState::Remote { path_style, .. } => *path_style,
 202        }
 203    }
 204
 205    pub fn find_or_create_worktree(
 206        &mut self,
 207        abs_path: impl AsRef<Path>,
 208        visible: bool,
 209        cx: &mut Context<Self>,
 210    ) -> Task<Result<(Entity<Worktree>, Arc<RelPath>)>> {
 211        let abs_path = abs_path.as_ref();
 212        if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
 213            Task::ready(Ok((tree, relative_path)))
 214        } else {
 215            let worktree = self.create_worktree(abs_path, visible, cx);
 216            cx.background_spawn(async move { Ok((worktree.await?, RelPath::empty().into())) })
 217        }
 218    }
 219
 220    pub fn entry_for_id<'a>(&'a self, entry_id: ProjectEntryId, cx: &'a App) -> Option<&'a Entry> {
 221        self.worktrees()
 222            .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
 223    }
 224
 225    pub fn worktree_and_entry_for_id<'a>(
 226        &'a self,
 227        entry_id: ProjectEntryId,
 228        cx: &'a App,
 229    ) -> Option<(Entity<Worktree>, &'a Entry)> {
 230        self.worktrees().find_map(|worktree| {
 231            worktree
 232                .read(cx)
 233                .entry_for_id(entry_id)
 234                .map(|e| (worktree.clone(), e))
 235        })
 236    }
 237
 238    pub fn entry_for_path<'a>(&'a self, path: &ProjectPath, cx: &'a App) -> Option<&'a Entry> {
 239        self.worktree_for_id(path.worktree_id, cx)?
 240            .read(cx)
 241            .entry_for_path(&path.path)
 242    }
 243
 244    pub fn copy_entry(
 245        &mut self,
 246        entry_id: ProjectEntryId,
 247        new_project_path: ProjectPath,
 248        cx: &mut Context<Self>,
 249    ) -> Task<Result<Option<Entry>>> {
 250        let Some(old_worktree) = self.worktree_for_entry(entry_id, cx) else {
 251            return Task::ready(Err(anyhow!("no such worktree")));
 252        };
 253        let Some(old_entry) = old_worktree.read(cx).entry_for_id(entry_id) else {
 254            return Task::ready(Err(anyhow!("no such entry")));
 255        };
 256        let Some(new_worktree) = self.worktree_for_id(new_project_path.worktree_id, cx) else {
 257            return Task::ready(Err(anyhow!("no such worktree")));
 258        };
 259
 260        match &self.state {
 261            WorktreeStoreState::Local { fs } => {
 262                let old_abs_path = old_worktree.read(cx).absolutize(&old_entry.path);
 263                let new_abs_path = new_worktree.read(cx).absolutize(&new_project_path.path);
 264                let fs = fs.clone();
 265                let copy = cx.background_spawn(async move {
 266                    copy_recursive(
 267                        fs.as_ref(),
 268                        &old_abs_path,
 269                        &new_abs_path,
 270                        Default::default(),
 271                    )
 272                    .await
 273                });
 274
 275                cx.spawn(async move |_, cx| {
 276                    copy.await?;
 277                    new_worktree
 278                        .update(cx, |this, cx| {
 279                            this.as_local_mut().unwrap().refresh_entry(
 280                                new_project_path.path,
 281                                None,
 282                                cx,
 283                            )
 284                        })?
 285                        .await
 286                })
 287            }
 288            WorktreeStoreState::Remote {
 289                upstream_client,
 290                upstream_project_id,
 291                ..
 292            } => {
 293                let response = upstream_client.request(proto::CopyProjectEntry {
 294                    project_id: *upstream_project_id,
 295                    entry_id: entry_id.to_proto(),
 296                    new_path: new_project_path.path.to_proto(),
 297                    new_worktree_id: new_project_path.worktree_id.to_proto(),
 298                });
 299                cx.spawn(async move |_, cx| {
 300                    let response = response.await?;
 301                    match response.entry {
 302                        Some(entry) => new_worktree
 303                            .update(cx, |worktree, cx| {
 304                                worktree.as_remote_mut().unwrap().insert_entry(
 305                                    entry,
 306                                    response.worktree_scan_id as usize,
 307                                    cx,
 308                                )
 309                            })?
 310                            .await
 311                            .map(Some),
 312                        None => Ok(None),
 313                    }
 314                })
 315            }
 316        }
 317    }
 318
 319    pub fn rename_entry(
 320        &mut self,
 321        entry_id: ProjectEntryId,
 322        new_project_path: ProjectPath,
 323        cx: &mut Context<Self>,
 324    ) -> Task<Result<CreatedEntry>> {
 325        let Some(old_worktree) = self.worktree_for_entry(entry_id, cx) else {
 326            return Task::ready(Err(anyhow!("no such worktree")));
 327        };
 328        let Some(old_entry) = old_worktree.read(cx).entry_for_id(entry_id).cloned() else {
 329            return Task::ready(Err(anyhow!("no such entry")));
 330        };
 331        let Some(new_worktree) = self.worktree_for_id(new_project_path.worktree_id, cx) else {
 332            return Task::ready(Err(anyhow!("no such worktree")));
 333        };
 334
 335        match &self.state {
 336            WorktreeStoreState::Local { fs } => {
 337                let abs_old_path = old_worktree.read(cx).absolutize(&old_entry.path);
 338                let new_worktree_ref = new_worktree.read(cx);
 339                let is_root_entry = new_worktree_ref
 340                    .root_entry()
 341                    .is_some_and(|e| e.id == entry_id);
 342                let abs_new_path = if is_root_entry {
 343                    let abs_path = new_worktree_ref.abs_path();
 344                    let Some(root_parent_path) = abs_path.parent() else {
 345                        return Task::ready(Err(anyhow!("no parent for path {:?}", abs_path)));
 346                    };
 347                    root_parent_path.join(new_project_path.path.as_std_path())
 348                } else {
 349                    new_worktree_ref.absolutize(&new_project_path.path)
 350                };
 351
 352                let fs = fs.clone();
 353                let case_sensitive = new_worktree
 354                    .read(cx)
 355                    .as_local()
 356                    .unwrap()
 357                    .fs_is_case_sensitive();
 358
 359                let do_rename =
 360                    async move |fs: &dyn Fs, old_path: &Path, new_path: &Path, overwrite| {
 361                        fs.rename(
 362                            &old_path,
 363                            &new_path,
 364                            fs::RenameOptions {
 365                                overwrite,
 366                                ..fs::RenameOptions::default()
 367                            },
 368                        )
 369                        .await
 370                        .with_context(|| format!("renaming {old_path:?} into {new_path:?}"))
 371                    };
 372
 373                let rename = cx.background_spawn({
 374                    let abs_new_path = abs_new_path.clone();
 375                    async move {
 376                        // If we're on a case-insensitive FS and we're doing a case-only rename (i.e. `foobar` to `FOOBAR`)
 377                        // we want to overwrite, because otherwise we run into a file-already-exists error.
 378                        let overwrite = !case_sensitive
 379                            && abs_old_path != abs_new_path
 380                            && abs_old_path.to_str().map(|p| p.to_lowercase())
 381                                == abs_new_path.to_str().map(|p| p.to_lowercase());
 382
 383                        // The directory we're renaming into might not exist yet
 384                        if let Err(e) =
 385                            do_rename(fs.as_ref(), &abs_old_path, &abs_new_path, overwrite).await
 386                        {
 387                            if let Some(err) = e.downcast_ref::<std::io::Error>()
 388                                && err.kind() == std::io::ErrorKind::NotFound
 389                            {
 390                                if let Some(parent) = abs_new_path.parent() {
 391                                    fs.create_dir(parent).await.with_context(|| {
 392                                        format!("creating parent directory {parent:?}")
 393                                    })?;
 394                                    return do_rename(
 395                                        fs.as_ref(),
 396                                        &abs_old_path,
 397                                        &abs_new_path,
 398                                        overwrite,
 399                                    )
 400                                    .await;
 401                                }
 402                            }
 403                            return Err(e);
 404                        }
 405                        Ok(())
 406                    }
 407                });
 408
 409                cx.spawn(async move |_, cx| {
 410                    rename.await?;
 411                    Ok(new_worktree
 412                        .update(cx, |this, cx| {
 413                            let local = this.as_local_mut().unwrap();
 414                            if is_root_entry {
 415                                // We eagerly update `abs_path` and refresh this worktree.
 416                                // Otherwise, the FS watcher would do it on the `RootUpdated` event,
 417                                // but with a noticeable delay, so we handle it proactively.
 418                                local.update_abs_path_and_refresh(
 419                                    SanitizedPath::new_arc(&abs_new_path),
 420                                    cx,
 421                                );
 422                                Task::ready(Ok(this.root_entry().cloned()))
 423                            } else {
 424                                // First refresh the parent directory (in case it was newly created)
 425                                if let Some(parent) = new_project_path.path.parent() {
 426                                    let _ = local.refresh_entries_for_paths(vec![parent.into()]);
 427                                }
 428                                // Then refresh the new path
 429                                local.refresh_entry(
 430                                    new_project_path.path.clone(),
 431                                    Some(old_entry.path),
 432                                    cx,
 433                                )
 434                            }
 435                        })?
 436                        .await?
 437                        .map(CreatedEntry::Included)
 438                        .unwrap_or_else(|| CreatedEntry::Excluded {
 439                            abs_path: abs_new_path,
 440                        }))
 441                })
 442            }
 443            WorktreeStoreState::Remote {
 444                upstream_client,
 445                upstream_project_id,
 446                ..
 447            } => {
 448                let response = upstream_client.request(proto::RenameProjectEntry {
 449                    project_id: *upstream_project_id,
 450                    entry_id: entry_id.to_proto(),
 451                    new_path: new_project_path.path.to_proto(),
 452                    new_worktree_id: new_project_path.worktree_id.to_proto(),
 453                });
 454                cx.spawn(async move |_, cx| {
 455                    let response = response.await?;
 456                    match response.entry {
 457                        Some(entry) => new_worktree
 458                            .update(cx, |worktree, cx| {
 459                                worktree.as_remote_mut().unwrap().insert_entry(
 460                                    entry,
 461                                    response.worktree_scan_id as usize,
 462                                    cx,
 463                                )
 464                            })?
 465                            .await
 466                            .map(CreatedEntry::Included),
 467                        None => {
 468                            let abs_path = new_worktree.read_with(cx, |worktree, _| {
 469                                worktree.absolutize(&new_project_path.path)
 470                            })?;
 471                            Ok(CreatedEntry::Excluded { abs_path })
 472                        }
 473                    }
 474                })
 475            }
 476        }
 477    }
 478    pub fn create_worktree(
 479        &mut self,
 480        abs_path: impl AsRef<Path>,
 481        visible: bool,
 482        cx: &mut Context<Self>,
 483    ) -> Task<Result<Entity<Worktree>>> {
 484        let abs_path: Arc<SanitizedPath> = SanitizedPath::new_arc(&abs_path);
 485        if !self.loading_worktrees.contains_key(&abs_path) {
 486            let task = match &self.state {
 487                WorktreeStoreState::Remote {
 488                    upstream_client,
 489                    path_style,
 490                    ..
 491                } => {
 492                    if upstream_client.is_via_collab() {
 493                        Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
 494                    } else {
 495                        let abs_path = RemotePathBuf::new(abs_path.to_string(), *path_style);
 496                        self.create_remote_worktree(upstream_client.clone(), abs_path, visible, cx)
 497                    }
 498                }
 499                WorktreeStoreState::Local { fs } => {
 500                    self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
 501                }
 502            };
 503
 504            self.loading_worktrees
 505                .insert(abs_path.clone(), task.shared());
 506        }
 507        let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
 508        cx.spawn(async move |this, cx| {
 509            let result = task.await;
 510            this.update(cx, |this, _| this.loading_worktrees.remove(&abs_path))
 511                .ok();
 512            match result {
 513                Ok(worktree) => Ok(worktree),
 514                Err(err) => Err((*err).cloned()),
 515            }
 516        })
 517    }
 518
 519    fn create_remote_worktree(
 520        &mut self,
 521        client: AnyProtoClient,
 522        abs_path: RemotePathBuf,
 523        visible: bool,
 524        cx: &mut Context<Self>,
 525    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
 526        let path_style = abs_path.path_style();
 527        let mut abs_path = abs_path.to_string();
 528        // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
 529        // in which case want to strip the leading the `/`.
 530        // On the host-side, the `~` will get expanded.
 531        // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
 532        if abs_path.starts_with("/~") {
 533            abs_path = abs_path[1..].to_string();
 534        }
 535        if abs_path.is_empty() {
 536            abs_path = "~/".to_string();
 537        }
 538
 539        cx.spawn(async move |this, cx| {
 540            let this = this.upgrade().context("Dropped worktree store")?;
 541
 542            let path = RemotePathBuf::new(abs_path, path_style);
 543            let response = client
 544                .request(proto::AddWorktree {
 545                    project_id: REMOTE_SERVER_PROJECT_ID,
 546                    path: path.to_proto(),
 547                    visible,
 548                })
 549                .await?;
 550
 551            if let Some(existing_worktree) = this.read_with(cx, |this, cx| {
 552                this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
 553            })? {
 554                return Ok(existing_worktree);
 555            }
 556
 557            let root_path_buf = PathBuf::from(response.canonicalized_path.clone());
 558            let root_name = root_path_buf
 559                .file_name()
 560                .map(|n| n.to_string_lossy().into_owned())
 561                .unwrap_or(root_path_buf.to_string_lossy().into_owned());
 562
 563            let worktree = cx.update(|cx| {
 564                Worktree::remote(
 565                    REMOTE_SERVER_PROJECT_ID,
 566                    ReplicaId::REMOTE_SERVER,
 567                    proto::WorktreeMetadata {
 568                        id: response.worktree_id,
 569                        root_name,
 570                        visible,
 571                        abs_path: response.canonicalized_path,
 572                    },
 573                    client,
 574                    path_style,
 575                    cx,
 576                )
 577            })?;
 578
 579            this.update(cx, |this, cx| {
 580                this.add(&worktree, cx);
 581            })?;
 582            Ok(worktree)
 583        })
 584    }
 585
 586    fn create_local_worktree(
 587        &mut self,
 588        fs: Arc<dyn Fs>,
 589        abs_path: Arc<SanitizedPath>,
 590        visible: bool,
 591        cx: &mut Context<Self>,
 592    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
 593        let next_entry_id = self.next_entry_id.clone();
 594        let scanning_enabled = self.scanning_enabled;
 595
 596        cx.spawn(async move |this, cx| {
 597            let worktree = Worktree::local(
 598                SanitizedPath::cast_arc(abs_path.clone()),
 599                visible,
 600                fs,
 601                next_entry_id,
 602                scanning_enabled,
 603                cx,
 604            )
 605            .await;
 606
 607            let worktree = worktree?;
 608
 609            this.update(cx, |this, cx| this.add(&worktree, cx))?;
 610
 611            if visible {
 612                cx.update(|cx| {
 613                    cx.add_recent_document(abs_path.as_path());
 614                })
 615                .log_err();
 616            }
 617
 618            Ok(worktree)
 619        })
 620    }
 621
 622    pub fn add(&mut self, worktree: &Entity<Worktree>, cx: &mut Context<Self>) {
 623        let worktree_id = worktree.read(cx).id();
 624        debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
 625
 626        let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
 627        let handle = if push_strong_handle {
 628            WorktreeHandle::Strong(worktree.clone())
 629        } else {
 630            WorktreeHandle::Weak(worktree.downgrade())
 631        };
 632        if self.worktrees_reordered {
 633            self.worktrees.push(handle);
 634        } else {
 635            let i = match self
 636                .worktrees
 637                .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
 638                    other.upgrade().map(|worktree| worktree.read(cx).abs_path())
 639                }) {
 640                Ok(i) | Err(i) => i,
 641            };
 642            self.worktrees.insert(i, handle);
 643        }
 644
 645        cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
 646        self.send_project_updates(cx);
 647
 648        let handle_id = worktree.entity_id();
 649        cx.subscribe(worktree, |_, worktree, event, cx| {
 650            let worktree_id = worktree.read(cx).id();
 651            match event {
 652                worktree::Event::UpdatedEntries(changes) => {
 653                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedEntries(
 654                        worktree_id,
 655                        changes.clone(),
 656                    ));
 657                }
 658                worktree::Event::UpdatedGitRepositories(set) => {
 659                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedGitRepositories(
 660                        worktree_id,
 661                        set.clone(),
 662                    ));
 663                }
 664                worktree::Event::DeletedEntry(id) => {
 665                    cx.emit(WorktreeStoreEvent::WorktreeDeletedEntry(worktree_id, *id))
 666                }
 667            }
 668        })
 669        .detach();
 670        cx.observe_release(worktree, move |this, worktree, cx| {
 671            cx.emit(WorktreeStoreEvent::WorktreeReleased(
 672                handle_id,
 673                worktree.id(),
 674            ));
 675            cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 676                handle_id,
 677                worktree.id(),
 678            ));
 679            this.send_project_updates(cx);
 680        })
 681        .detach();
 682    }
 683
 684    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut Context<Self>) {
 685        self.worktrees.retain(|worktree| {
 686            if let Some(worktree) = worktree.upgrade() {
 687                if worktree.read(cx).id() == id_to_remove {
 688                    cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 689                        worktree.entity_id(),
 690                        id_to_remove,
 691                    ));
 692                    false
 693                } else {
 694                    true
 695                }
 696            } else {
 697                false
 698            }
 699        });
 700        self.send_project_updates(cx);
 701    }
 702
 703    pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
 704        self.worktrees_reordered = worktrees_reordered;
 705    }
 706
 707    fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
 708        match &self.state {
 709            WorktreeStoreState::Remote {
 710                upstream_client,
 711                upstream_project_id,
 712                ..
 713            } => Some((upstream_client.clone(), *upstream_project_id)),
 714            WorktreeStoreState::Local { .. } => None,
 715        }
 716    }
 717
 718    pub fn set_worktrees_from_proto(
 719        &mut self,
 720        worktrees: Vec<proto::WorktreeMetadata>,
 721        replica_id: ReplicaId,
 722        cx: &mut Context<Self>,
 723    ) -> Result<()> {
 724        let mut old_worktrees_by_id = self
 725            .worktrees
 726            .drain(..)
 727            .filter_map(|worktree| {
 728                let worktree = worktree.upgrade()?;
 729                Some((worktree.read(cx).id(), worktree))
 730            })
 731            .collect::<HashMap<_, _>>();
 732
 733        let (client, project_id) = self.upstream_client().context("invalid project")?;
 734
 735        for worktree in worktrees {
 736            if let Some(old_worktree) =
 737                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
 738            {
 739                let push_strong_handle =
 740                    self.retain_worktrees || old_worktree.read(cx).is_visible();
 741                let handle = if push_strong_handle {
 742                    WorktreeHandle::Strong(old_worktree.clone())
 743                } else {
 744                    WorktreeHandle::Weak(old_worktree.downgrade())
 745                };
 746                self.worktrees.push(handle);
 747            } else {
 748                self.add(
 749                    &Worktree::remote(
 750                        project_id,
 751                        replica_id,
 752                        worktree,
 753                        client.clone(),
 754                        self.path_style(),
 755                        cx,
 756                    ),
 757                    cx,
 758                );
 759            }
 760        }
 761        self.send_project_updates(cx);
 762
 763        Ok(())
 764    }
 765
 766    pub fn move_worktree(
 767        &mut self,
 768        source: WorktreeId,
 769        destination: WorktreeId,
 770        cx: &mut Context<Self>,
 771    ) -> Result<()> {
 772        if source == destination {
 773            return Ok(());
 774        }
 775
 776        let mut source_index = None;
 777        let mut destination_index = None;
 778        for (i, worktree) in self.worktrees.iter().enumerate() {
 779            if let Some(worktree) = worktree.upgrade() {
 780                let worktree_id = worktree.read(cx).id();
 781                if worktree_id == source {
 782                    source_index = Some(i);
 783                    if destination_index.is_some() {
 784                        break;
 785                    }
 786                } else if worktree_id == destination {
 787                    destination_index = Some(i);
 788                    if source_index.is_some() {
 789                        break;
 790                    }
 791                }
 792            }
 793        }
 794
 795        let source_index =
 796            source_index.with_context(|| format!("Missing worktree for id {source}"))?;
 797        let destination_index =
 798            destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
 799
 800        if source_index == destination_index {
 801            return Ok(());
 802        }
 803
 804        let worktree_to_move = self.worktrees.remove(source_index);
 805        self.worktrees.insert(destination_index, worktree_to_move);
 806        self.worktrees_reordered = true;
 807        cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
 808        cx.notify();
 809        Ok(())
 810    }
 811
 812    pub fn disconnected_from_host(&mut self, cx: &mut App) {
 813        for worktree in &self.worktrees {
 814            if let Some(worktree) = worktree.upgrade() {
 815                worktree.update(cx, |worktree, _| {
 816                    if let Some(worktree) = worktree.as_remote_mut() {
 817                        worktree.disconnected_from_host();
 818                    }
 819                });
 820            }
 821        }
 822    }
 823
 824    pub fn send_project_updates(&mut self, cx: &mut Context<Self>) {
 825        let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
 826            return;
 827        };
 828
 829        let update = proto::UpdateProject {
 830            project_id,
 831            worktrees: self.worktree_metadata_protos(cx),
 832        };
 833
 834        // collab has bad concurrency guarantees, so we send requests in serial.
 835        let update_project = if downstream_client.is_via_collab() {
 836            Some(downstream_client.request(update))
 837        } else {
 838            downstream_client.send(update).log_err();
 839            None
 840        };
 841        cx.spawn(async move |this, cx| {
 842            if let Some(update_project) = update_project {
 843                update_project.await?;
 844            }
 845
 846            this.update(cx, |this, cx| {
 847                let worktrees = this.worktrees().collect::<Vec<_>>();
 848
 849                for worktree in worktrees {
 850                    worktree.update(cx, |worktree, cx| {
 851                        let client = downstream_client.clone();
 852                        worktree.observe_updates(project_id, cx, {
 853                            move |update| {
 854                                let client = client.clone();
 855                                async move {
 856                                    if client.is_via_collab() {
 857                                        client
 858                                            .request(update)
 859                                            .map(|result| result.log_err().is_some())
 860                                            .await
 861                                    } else {
 862                                        client.send(update).log_err().is_some()
 863                                    }
 864                                }
 865                            }
 866                        });
 867                    });
 868
 869                    cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
 870                }
 871
 872                anyhow::Ok(())
 873            })
 874        })
 875        .detach_and_log_err(cx);
 876    }
 877
 878    pub fn worktree_metadata_protos(&self, cx: &App) -> Vec<proto::WorktreeMetadata> {
 879        self.worktrees()
 880            .map(|worktree| {
 881                let worktree = worktree.read(cx);
 882                proto::WorktreeMetadata {
 883                    id: worktree.id().to_proto(),
 884                    root_name: worktree.root_name_str().to_owned(),
 885                    visible: worktree.is_visible(),
 886                    abs_path: worktree.abs_path().to_string_lossy().into_owned(),
 887                }
 888            })
 889            .collect()
 890    }
 891
 892    pub fn shared(
 893        &mut self,
 894        remote_id: u64,
 895        downstream_client: AnyProtoClient,
 896        cx: &mut Context<Self>,
 897    ) {
 898        self.retain_worktrees = true;
 899        self.downstream_client = Some((downstream_client, remote_id));
 900
 901        // When shared, retain all worktrees
 902        for worktree_handle in self.worktrees.iter_mut() {
 903            match worktree_handle {
 904                WorktreeHandle::Strong(_) => {}
 905                WorktreeHandle::Weak(worktree) => {
 906                    if let Some(worktree) = worktree.upgrade() {
 907                        *worktree_handle = WorktreeHandle::Strong(worktree);
 908                    }
 909                }
 910            }
 911        }
 912        self.send_project_updates(cx);
 913    }
 914
 915    pub fn unshared(&mut self, cx: &mut Context<Self>) {
 916        self.retain_worktrees = false;
 917        self.downstream_client.take();
 918
 919        // When not shared, only retain the visible worktrees
 920        for worktree_handle in self.worktrees.iter_mut() {
 921            if let WorktreeHandle::Strong(worktree) = worktree_handle {
 922                let is_visible = worktree.update(cx, |worktree, _| {
 923                    worktree.stop_observing_updates();
 924                    worktree.is_visible()
 925                });
 926                if !is_visible {
 927                    *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
 928                }
 929            }
 930        }
 931    }
 932
 933    /// search over all worktrees and return buffers that *might* match the search.
 934    pub fn find_search_candidates(
 935        &self,
 936        query: SearchQuery,
 937        limit: usize,
 938        open_entries: HashSet<ProjectEntryId>,
 939        fs: Arc<dyn Fs>,
 940        cx: &Context<Self>,
 941    ) -> Receiver<ProjectPath> {
 942        let snapshots = self
 943            .visible_worktrees(cx)
 944            .filter_map(|tree| {
 945                let tree = tree.read(cx);
 946                Some((tree.snapshot(), tree.as_local()?.settings()))
 947            })
 948            .collect::<Vec<_>>();
 949
 950        let executor = cx.background_executor().clone();
 951
 952        // We want to return entries in the order they are in the worktrees, so we have one
 953        // thread that iterates over the worktrees (and ignored directories) as necessary,
 954        // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
 955        // channel.
 956        // We spawn a number of workers that take items from the filter channel and check the query
 957        // against the version of the file on disk.
 958        let (filter_tx, filter_rx) = smol::channel::bounded(64);
 959        let (output_tx, output_rx) = smol::channel::bounded(64);
 960        let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
 961
 962        let input = cx.background_spawn({
 963            let fs = fs.clone();
 964            let query = query.clone();
 965            async move {
 966                Self::find_candidate_paths(
 967                    fs,
 968                    snapshots,
 969                    open_entries,
 970                    query,
 971                    filter_tx,
 972                    output_tx,
 973                )
 974                .await
 975                .log_err();
 976            }
 977        });
 978        const MAX_CONCURRENT_FILE_SCANS: usize = 64;
 979        let filters = cx.background_spawn(async move {
 980            let fs = &fs;
 981            let query = &query;
 982            executor
 983                .scoped(move |scope| {
 984                    for _ in 0..MAX_CONCURRENT_FILE_SCANS {
 985                        let filter_rx = filter_rx.clone();
 986                        scope.spawn(async move {
 987                            Self::filter_paths(fs, filter_rx, query)
 988                                .await
 989                                .log_with_level(log::Level::Debug);
 990                        })
 991                    }
 992                })
 993                .await;
 994        });
 995        cx.background_spawn(async move {
 996            let mut matched = 0;
 997            while let Ok(mut receiver) = output_rx.recv().await {
 998                let Some(path) = receiver.next().await else {
 999                    continue;
1000                };
1001                let Ok(_) = matching_paths_tx.send(path).await else {
1002                    break;
1003                };
1004                matched += 1;
1005                if matched == limit {
1006                    break;
1007                }
1008            }
1009            drop(input);
1010            drop(filters);
1011        })
1012        .detach();
1013        matching_paths_rx
1014    }
1015
1016    async fn find_candidate_paths(
1017        _: Arc<dyn Fs>,
1018        _: Vec<(worktree::Snapshot, WorktreeSettings)>,
1019        _: HashSet<ProjectEntryId>,
1020        _: SearchQuery,
1021        _: Sender<MatchingEntry>,
1022        _: Sender<oneshot::Receiver<ProjectPath>>,
1023    ) -> Result<()> {
1024        Ok(())
1025    }
1026
1027    async fn filter_paths(
1028        fs: &Arc<dyn Fs>,
1029        input: Receiver<MatchingEntry>,
1030        query: &SearchQuery,
1031    ) -> Result<()> {
1032        let mut input = pin!(input);
1033        while let Some(mut entry) = input.next().await {
1034            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
1035            let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
1036                continue;
1037            };
1038
1039            let mut file = BufReader::new(file);
1040            let file_start = file.fill_buf()?;
1041
1042            if let Err(Some(starting_position)) =
1043                std::str::from_utf8(file_start).map_err(|e| e.error_len())
1044            {
1045                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
1046                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
1047                log::debug!(
1048                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
1049                );
1050                continue;
1051            }
1052
1053            if query.detect(file).unwrap_or(false) {
1054                entry.respond.send(entry.path).await?
1055            }
1056        }
1057
1058        Ok(())
1059    }
1060
1061    pub async fn handle_create_project_entry(
1062        this: Entity<Self>,
1063        envelope: TypedEnvelope<proto::CreateProjectEntry>,
1064        mut cx: AsyncApp,
1065    ) -> Result<proto::ProjectEntryResponse> {
1066        let worktree = this.update(&mut cx, |this, cx| {
1067            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
1068            this.worktree_for_id(worktree_id, cx)
1069                .context("worktree not found")
1070        })??;
1071        Worktree::handle_create_entry(worktree, envelope.payload, cx).await
1072    }
1073
1074    pub async fn handle_copy_project_entry(
1075        this: Entity<Self>,
1076        envelope: TypedEnvelope<proto::CopyProjectEntry>,
1077        mut cx: AsyncApp,
1078    ) -> Result<proto::ProjectEntryResponse> {
1079        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1080        let new_worktree_id = WorktreeId::from_proto(envelope.payload.new_worktree_id);
1081        let new_project_path = (
1082            new_worktree_id,
1083            RelPath::from_proto(&envelope.payload.new_path)?,
1084        );
1085        let (scan_id, entry) = this.update(&mut cx, |this, cx| {
1086            let Some((_, project_id)) = this.downstream_client else {
1087                bail!("no downstream client")
1088            };
1089            let Some(entry) = this.entry_for_id(entry_id, cx) else {
1090                bail!("no such entry");
1091            };
1092            if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1093                bail!("entry is private")
1094            }
1095
1096            let new_worktree = this
1097                .worktree_for_id(new_worktree_id, cx)
1098                .context("no such worktree")?;
1099            let scan_id = new_worktree.read(cx).scan_id();
1100            anyhow::Ok((
1101                scan_id,
1102                this.copy_entry(entry_id, new_project_path.into(), cx),
1103            ))
1104        })??;
1105        let entry = entry.await?;
1106        Ok(proto::ProjectEntryResponse {
1107            entry: entry.as_ref().map(|entry| entry.into()),
1108            worktree_scan_id: scan_id as u64,
1109        })
1110    }
1111
1112    pub async fn handle_delete_project_entry(
1113        this: Entity<Self>,
1114        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
1115        mut cx: AsyncApp,
1116    ) -> Result<proto::ProjectEntryResponse> {
1117        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1118        let worktree = this.update(&mut cx, |this, cx| {
1119            let Some((_, project_id)) = this.downstream_client else {
1120                bail!("no downstream client")
1121            };
1122            let Some(entry) = this.entry_for_id(entry_id, cx) else {
1123                bail!("no entry")
1124            };
1125            if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1126                bail!("entry is private")
1127            }
1128            this.worktree_for_entry(entry_id, cx)
1129                .context("worktree not found")
1130        })??;
1131        Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
1132    }
1133
1134    pub async fn handle_rename_project_entry(
1135        this: Entity<Self>,
1136        request: proto::RenameProjectEntry,
1137        mut cx: AsyncApp,
1138    ) -> Result<proto::ProjectEntryResponse> {
1139        let entry_id = ProjectEntryId::from_proto(request.entry_id);
1140        let new_worktree_id = WorktreeId::from_proto(request.new_worktree_id);
1141        let rel_path = RelPath::from_proto(&request.new_path)
1142            .with_context(|| format!("received invalid relative path {:?}", &request.new_path))?;
1143
1144        let (scan_id, task) = this.update(&mut cx, |this, cx| {
1145            let worktree = this
1146                .worktree_for_entry(entry_id, cx)
1147                .context("no such worktree")?;
1148
1149            let Some((_, project_id)) = this.downstream_client else {
1150                bail!("no downstream client")
1151            };
1152            let entry = worktree
1153                .read(cx)
1154                .entry_for_id(entry_id)
1155                .ok_or_else(|| anyhow!("missing entry"))?;
1156            if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1157                bail!("entry is private")
1158            }
1159
1160            let scan_id = worktree.read(cx).scan_id();
1161            anyhow::Ok((
1162                scan_id,
1163                this.rename_entry(entry_id, (new_worktree_id, rel_path).into(), cx),
1164            ))
1165        })??;
1166        Ok(proto::ProjectEntryResponse {
1167            entry: match &task.await? {
1168                CreatedEntry::Included(entry) => Some(entry.into()),
1169                CreatedEntry::Excluded { .. } => None,
1170            },
1171            worktree_scan_id: scan_id as u64,
1172        })
1173    }
1174
1175    pub async fn handle_expand_project_entry(
1176        this: Entity<Self>,
1177        envelope: TypedEnvelope<proto::ExpandProjectEntry>,
1178        mut cx: AsyncApp,
1179    ) -> Result<proto::ExpandProjectEntryResponse> {
1180        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1181        let worktree = this
1182            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1183            .context("invalid request")?;
1184        Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
1185    }
1186
1187    pub async fn handle_expand_all_for_project_entry(
1188        this: Entity<Self>,
1189        envelope: TypedEnvelope<proto::ExpandAllForProjectEntry>,
1190        mut cx: AsyncApp,
1191    ) -> Result<proto::ExpandAllForProjectEntryResponse> {
1192        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1193        let worktree = this
1194            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1195            .context("invalid request")?;
1196        Worktree::handle_expand_all_for_entry(worktree, envelope.payload, cx).await
1197    }
1198
1199    pub fn fs(&self) -> Option<Arc<dyn Fs>> {
1200        match &self.state {
1201            WorktreeStoreState::Local { fs } => Some(fs.clone()),
1202            WorktreeStoreState::Remote { .. } => None,
1203        }
1204    }
1205}
1206
1207#[derive(Clone, Debug)]
1208enum WorktreeHandle {
1209    Strong(Entity<Worktree>),
1210    Weak(WeakEntity<Worktree>),
1211}
1212
1213impl WorktreeHandle {
1214    fn upgrade(&self) -> Option<Entity<Worktree>> {
1215        match self {
1216            WorktreeHandle::Strong(handle) => Some(handle.clone()),
1217            WorktreeHandle::Weak(handle) => handle.upgrade(),
1218        }
1219    }
1220}