worktree_store.rs

   1use std::{
   2    io::{BufRead, BufReader},
   3    path::{Path, PathBuf},
   4    pin::pin,
   5    sync::{Arc, atomic::AtomicUsize},
   6};
   7
   8use anyhow::{Context as _, Result, anyhow, bail};
   9use collections::{HashMap, HashSet};
  10use fs::{Fs, copy_recursive};
  11use futures::{FutureExt, SinkExt, future::Shared};
  12use gpui::{
  13    App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity,
  14};
  15use postage::oneshot;
  16use rpc::{
  17    AnyProtoClient, ErrorExt, TypedEnvelope,
  18    proto::{self, REMOTE_SERVER_PROJECT_ID},
  19};
  20use smol::{
  21    channel::{Receiver, Sender},
  22    stream::StreamExt,
  23};
  24use text::ReplicaId;
  25use util::{
  26    ResultExt,
  27    paths::{PathStyle, RemotePathBuf, SanitizedPath},
  28    rel_path::RelPath,
  29};
  30use worktree::{
  31    CreatedEntry, Entry, ProjectEntryId, UpdatedEntriesSet, UpdatedGitRepositoriesSet, Worktree,
  32    WorktreeId, WorktreeSettings,
  33};
  34
  35use crate::{ProjectPath, search::SearchQuery, trusted_worktrees::TrustedWorktrees};
  36
  37struct MatchingEntry {
  38    worktree_root: Arc<Path>,
  39    path: ProjectPath,
  40    respond: oneshot::Sender<ProjectPath>,
  41}
  42
  43enum WorktreeStoreState {
  44    Local {
  45        fs: Arc<dyn Fs>,
  46    },
  47    Remote {
  48        upstream_client: AnyProtoClient,
  49        upstream_project_id: u64,
  50        path_style: PathStyle,
  51    },
  52}
  53
  54pub struct WorktreeStore {
  55    next_entry_id: Arc<AtomicUsize>,
  56    downstream_client: Option<(AnyProtoClient, u64)>,
  57    retain_worktrees: bool,
  58    worktrees: Vec<WorktreeHandle>,
  59    worktrees_reordered: bool,
  60    scanning_enabled: bool,
  61    #[allow(clippy::type_complexity)]
  62    loading_worktrees:
  63        HashMap<Arc<SanitizedPath>, Shared<Task<Result<Entity<Worktree>, Arc<anyhow::Error>>>>>,
  64    state: WorktreeStoreState,
  65}
  66
  67#[derive(Debug)]
  68pub enum WorktreeStoreEvent {
  69    WorktreeAdded(Entity<Worktree>),
  70    WorktreeRemoved(EntityId, WorktreeId),
  71    WorktreeReleased(EntityId, WorktreeId),
  72    WorktreeOrderChanged,
  73    WorktreeUpdateSent(Entity<Worktree>),
  74    WorktreeUpdatedEntries(WorktreeId, UpdatedEntriesSet),
  75    WorktreeUpdatedGitRepositories(WorktreeId, UpdatedGitRepositoriesSet),
  76    WorktreeDeletedEntry(WorktreeId, ProjectEntryId),
  77}
  78
  79impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
  80
  81impl WorktreeStore {
  82    pub fn init(client: &AnyProtoClient) {
  83        client.add_entity_request_handler(Self::handle_create_project_entry);
  84        client.add_entity_request_handler(Self::handle_copy_project_entry);
  85        client.add_entity_request_handler(Self::handle_delete_project_entry);
  86        client.add_entity_request_handler(Self::handle_expand_project_entry);
  87        client.add_entity_request_handler(Self::handle_expand_all_for_project_entry);
  88    }
  89
  90    pub fn local(retain_worktrees: bool, fs: Arc<dyn Fs>) -> Self {
  91        Self {
  92            next_entry_id: Default::default(),
  93            loading_worktrees: Default::default(),
  94            downstream_client: None,
  95            worktrees: Vec::new(),
  96            worktrees_reordered: false,
  97            scanning_enabled: true,
  98            retain_worktrees,
  99            state: WorktreeStoreState::Local { fs },
 100        }
 101    }
 102
 103    pub fn remote(
 104        retain_worktrees: bool,
 105        upstream_client: AnyProtoClient,
 106        upstream_project_id: u64,
 107        path_style: PathStyle,
 108    ) -> Self {
 109        Self {
 110            next_entry_id: Default::default(),
 111            loading_worktrees: Default::default(),
 112            downstream_client: None,
 113            worktrees: Vec::new(),
 114            worktrees_reordered: false,
 115            scanning_enabled: true,
 116            retain_worktrees,
 117            state: WorktreeStoreState::Remote {
 118                upstream_client,
 119                upstream_project_id,
 120                path_style,
 121            },
 122        }
 123    }
 124
 125    pub fn disable_scanner(&mut self) {
 126        self.scanning_enabled = false;
 127    }
 128
 129    /// Iterates through all worktrees, including ones that don't appear in the project panel
 130    pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Entity<Worktree>> {
 131        self.worktrees
 132            .iter()
 133            .filter_map(move |worktree| worktree.upgrade())
 134    }
 135
 136    /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
 137    pub fn visible_worktrees<'a>(
 138        &'a self,
 139        cx: &'a App,
 140    ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
 141        self.worktrees()
 142            .filter(|worktree| worktree.read(cx).is_visible())
 143    }
 144
 145    /// Iterates through all user-visible worktrees (directories and files that appear in the project panel) and other, invisible single files that could appear e.g. due to drag and drop.
 146    pub fn visible_worktrees_and_single_files<'a>(
 147        &'a self,
 148        cx: &'a App,
 149    ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
 150        self.worktrees()
 151            .filter(|worktree| worktree.read(cx).is_visible() || worktree.read(cx).is_single_file())
 152    }
 153
 154    pub fn worktree_for_id(&self, id: WorktreeId, cx: &App) -> Option<Entity<Worktree>> {
 155        self.worktrees()
 156            .find(|worktree| worktree.read(cx).id() == id)
 157    }
 158
 159    pub fn worktree_for_entry(
 160        &self,
 161        entry_id: ProjectEntryId,
 162        cx: &App,
 163    ) -> Option<Entity<Worktree>> {
 164        self.worktrees()
 165            .find(|worktree| worktree.read(cx).contains_entry(entry_id))
 166    }
 167
 168    pub fn find_worktree(
 169        &self,
 170        abs_path: impl AsRef<Path>,
 171        cx: &App,
 172    ) -> Option<(Entity<Worktree>, Arc<RelPath>)> {
 173        let abs_path = SanitizedPath::new(abs_path.as_ref());
 174        for tree in self.worktrees() {
 175            let path_style = tree.read(cx).path_style();
 176            if let Ok(relative_path) = abs_path.as_ref().strip_prefix(tree.read(cx).abs_path())
 177                && let Ok(relative_path) = RelPath::new(relative_path, path_style)
 178            {
 179                return Some((tree.clone(), relative_path.into_arc()));
 180            }
 181        }
 182        None
 183    }
 184
 185    pub fn absolutize(&self, project_path: &ProjectPath, cx: &App) -> Option<PathBuf> {
 186        let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
 187        Some(worktree.read(cx).absolutize(&project_path.path))
 188    }
 189
 190    pub fn path_style(&self) -> PathStyle {
 191        match &self.state {
 192            WorktreeStoreState::Local { .. } => PathStyle::local(),
 193            WorktreeStoreState::Remote { path_style, .. } => *path_style,
 194        }
 195    }
 196
 197    pub fn find_or_create_worktree(
 198        &mut self,
 199        abs_path: impl AsRef<Path>,
 200        visible: bool,
 201        cx: &mut Context<Self>,
 202    ) -> Task<Result<(Entity<Worktree>, Arc<RelPath>)>> {
 203        let abs_path = abs_path.as_ref();
 204        if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
 205            Task::ready(Ok((tree, relative_path)))
 206        } else {
 207            let worktree = self.create_worktree(abs_path, visible, cx);
 208            cx.background_spawn(async move { Ok((worktree.await?, RelPath::empty().into())) })
 209        }
 210    }
 211
 212    pub fn entry_for_id<'a>(&'a self, entry_id: ProjectEntryId, cx: &'a App) -> Option<&'a Entry> {
 213        self.worktrees()
 214            .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
 215    }
 216
 217    pub fn worktree_and_entry_for_id<'a>(
 218        &'a self,
 219        entry_id: ProjectEntryId,
 220        cx: &'a App,
 221    ) -> Option<(Entity<Worktree>, &'a Entry)> {
 222        self.worktrees().find_map(|worktree| {
 223            worktree
 224                .read(cx)
 225                .entry_for_id(entry_id)
 226                .map(|e| (worktree.clone(), e))
 227        })
 228    }
 229
 230    pub fn entry_for_path<'a>(&'a self, path: &ProjectPath, cx: &'a App) -> Option<&'a Entry> {
 231        self.worktree_for_id(path.worktree_id, cx)?
 232            .read(cx)
 233            .entry_for_path(&path.path)
 234    }
 235
 236    pub fn copy_entry(
 237        &mut self,
 238        entry_id: ProjectEntryId,
 239        new_project_path: ProjectPath,
 240        cx: &mut Context<Self>,
 241    ) -> Task<Result<Option<Entry>>> {
 242        let Some(old_worktree) = self.worktree_for_entry(entry_id, cx) else {
 243            return Task::ready(Err(anyhow!("no such worktree")));
 244        };
 245        let Some(old_entry) = old_worktree.read(cx).entry_for_id(entry_id) else {
 246            return Task::ready(Err(anyhow!("no such entry")));
 247        };
 248        let Some(new_worktree) = self.worktree_for_id(new_project_path.worktree_id, cx) else {
 249            return Task::ready(Err(anyhow!("no such worktree")));
 250        };
 251
 252        match &self.state {
 253            WorktreeStoreState::Local { fs } => {
 254                let old_abs_path = old_worktree.read(cx).absolutize(&old_entry.path);
 255                let new_abs_path = new_worktree.read(cx).absolutize(&new_project_path.path);
 256                let fs = fs.clone();
 257                let copy = cx.background_spawn(async move {
 258                    copy_recursive(
 259                        fs.as_ref(),
 260                        &old_abs_path,
 261                        &new_abs_path,
 262                        Default::default(),
 263                    )
 264                    .await
 265                });
 266
 267                cx.spawn(async move |_, cx| {
 268                    copy.await?;
 269                    new_worktree
 270                        .update(cx, |this, cx| {
 271                            this.as_local_mut().unwrap().refresh_entry(
 272                                new_project_path.path,
 273                                None,
 274                                cx,
 275                            )
 276                        })?
 277                        .await
 278                })
 279            }
 280            WorktreeStoreState::Remote {
 281                upstream_client,
 282                upstream_project_id,
 283                ..
 284            } => {
 285                let response = upstream_client.request(proto::CopyProjectEntry {
 286                    project_id: *upstream_project_id,
 287                    entry_id: entry_id.to_proto(),
 288                    new_path: new_project_path.path.to_proto(),
 289                    new_worktree_id: new_project_path.worktree_id.to_proto(),
 290                });
 291                cx.spawn(async move |_, cx| {
 292                    let response = response.await?;
 293                    match response.entry {
 294                        Some(entry) => new_worktree
 295                            .update(cx, |worktree, cx| {
 296                                worktree.as_remote_mut().unwrap().insert_entry(
 297                                    entry,
 298                                    response.worktree_scan_id as usize,
 299                                    cx,
 300                                )
 301                            })?
 302                            .await
 303                            .map(Some),
 304                        None => Ok(None),
 305                    }
 306                })
 307            }
 308        }
 309    }
 310
 311    pub fn rename_entry(
 312        &mut self,
 313        entry_id: ProjectEntryId,
 314        new_project_path: ProjectPath,
 315        cx: &mut Context<Self>,
 316    ) -> Task<Result<CreatedEntry>> {
 317        let Some(old_worktree) = self.worktree_for_entry(entry_id, cx) else {
 318            return Task::ready(Err(anyhow!("no such worktree")));
 319        };
 320        let Some(old_entry) = old_worktree.read(cx).entry_for_id(entry_id).cloned() else {
 321            return Task::ready(Err(anyhow!("no such entry")));
 322        };
 323        let Some(new_worktree) = self.worktree_for_id(new_project_path.worktree_id, cx) else {
 324            return Task::ready(Err(anyhow!("no such worktree")));
 325        };
 326
 327        match &self.state {
 328            WorktreeStoreState::Local { fs } => {
 329                let abs_old_path = old_worktree.read(cx).absolutize(&old_entry.path);
 330                let new_worktree_ref = new_worktree.read(cx);
 331                let is_root_entry = new_worktree_ref
 332                    .root_entry()
 333                    .is_some_and(|e| e.id == entry_id);
 334                let abs_new_path = if is_root_entry {
 335                    let abs_path = new_worktree_ref.abs_path();
 336                    let Some(root_parent_path) = abs_path.parent() else {
 337                        return Task::ready(Err(anyhow!("no parent for path {:?}", abs_path)));
 338                    };
 339                    root_parent_path.join(new_project_path.path.as_std_path())
 340                } else {
 341                    new_worktree_ref.absolutize(&new_project_path.path)
 342                };
 343
 344                let fs = fs.clone();
 345                let case_sensitive = new_worktree
 346                    .read(cx)
 347                    .as_local()
 348                    .unwrap()
 349                    .fs_is_case_sensitive();
 350
 351                let do_rename =
 352                    async move |fs: &dyn Fs, old_path: &Path, new_path: &Path, overwrite| {
 353                        fs.rename(
 354                            &old_path,
 355                            &new_path,
 356                            fs::RenameOptions {
 357                                overwrite,
 358                                ..fs::RenameOptions::default()
 359                            },
 360                        )
 361                        .await
 362                        .with_context(|| format!("renaming {old_path:?} into {new_path:?}"))
 363                    };
 364
 365                let rename = cx.background_spawn({
 366                    let abs_new_path = abs_new_path.clone();
 367                    async move {
 368                        // If we're on a case-insensitive FS and we're doing a case-only rename (i.e. `foobar` to `FOOBAR`)
 369                        // we want to overwrite, because otherwise we run into a file-already-exists error.
 370                        let overwrite = !case_sensitive
 371                            && abs_old_path != abs_new_path
 372                            && abs_old_path.to_str().map(|p| p.to_lowercase())
 373                                == abs_new_path.to_str().map(|p| p.to_lowercase());
 374
 375                        // The directory we're renaming into might not exist yet
 376                        if let Err(e) =
 377                            do_rename(fs.as_ref(), &abs_old_path, &abs_new_path, overwrite).await
 378                        {
 379                            if let Some(err) = e.downcast_ref::<std::io::Error>()
 380                                && err.kind() == std::io::ErrorKind::NotFound
 381                            {
 382                                if let Some(parent) = abs_new_path.parent() {
 383                                    fs.create_dir(parent).await.with_context(|| {
 384                                        format!("creating parent directory {parent:?}")
 385                                    })?;
 386                                    return do_rename(
 387                                        fs.as_ref(),
 388                                        &abs_old_path,
 389                                        &abs_new_path,
 390                                        overwrite,
 391                                    )
 392                                    .await;
 393                                }
 394                            }
 395                            return Err(e);
 396                        }
 397                        Ok(())
 398                    }
 399                });
 400
 401                cx.spawn(async move |_, cx| {
 402                    rename.await?;
 403                    Ok(new_worktree
 404                        .update(cx, |this, cx| {
 405                            let local = this.as_local_mut().unwrap();
 406                            if is_root_entry {
 407                                // We eagerly update `abs_path` and refresh this worktree.
 408                                // Otherwise, the FS watcher would do it on the `RootUpdated` event,
 409                                // but with a noticeable delay, so we handle it proactively.
 410                                local.update_abs_path_and_refresh(
 411                                    SanitizedPath::new_arc(&abs_new_path),
 412                                    cx,
 413                                );
 414                                Task::ready(Ok(this.root_entry().cloned()))
 415                            } else {
 416                                // First refresh the parent directory (in case it was newly created)
 417                                if let Some(parent) = new_project_path.path.parent() {
 418                                    let _ = local.refresh_entries_for_paths(vec![parent.into()]);
 419                                }
 420                                // Then refresh the new path
 421                                local.refresh_entry(
 422                                    new_project_path.path.clone(),
 423                                    Some(old_entry.path),
 424                                    cx,
 425                                )
 426                            }
 427                        })?
 428                        .await?
 429                        .map(CreatedEntry::Included)
 430                        .unwrap_or_else(|| CreatedEntry::Excluded {
 431                            abs_path: abs_new_path,
 432                        }))
 433                })
 434            }
 435            WorktreeStoreState::Remote {
 436                upstream_client,
 437                upstream_project_id,
 438                ..
 439            } => {
 440                let response = upstream_client.request(proto::RenameProjectEntry {
 441                    project_id: *upstream_project_id,
 442                    entry_id: entry_id.to_proto(),
 443                    new_path: new_project_path.path.to_proto(),
 444                    new_worktree_id: new_project_path.worktree_id.to_proto(),
 445                });
 446                cx.spawn(async move |_, cx| {
 447                    let response = response.await?;
 448                    match response.entry {
 449                        Some(entry) => new_worktree
 450                            .update(cx, |worktree, cx| {
 451                                worktree.as_remote_mut().unwrap().insert_entry(
 452                                    entry,
 453                                    response.worktree_scan_id as usize,
 454                                    cx,
 455                                )
 456                            })?
 457                            .await
 458                            .map(CreatedEntry::Included),
 459                        None => {
 460                            let abs_path = new_worktree.read_with(cx, |worktree, _| {
 461                                worktree.absolutize(&new_project_path.path)
 462                            })?;
 463                            Ok(CreatedEntry::Excluded { abs_path })
 464                        }
 465                    }
 466                })
 467            }
 468        }
 469    }
 470    pub fn create_worktree(
 471        &mut self,
 472        abs_path: impl AsRef<Path>,
 473        visible: bool,
 474        cx: &mut Context<Self>,
 475    ) -> Task<Result<Entity<Worktree>>> {
 476        let abs_path: Arc<SanitizedPath> = SanitizedPath::new_arc(&abs_path);
 477        let is_via_collab = matches!(&self.state, WorktreeStoreState::Remote { upstream_client, .. } if upstream_client.is_via_collab());
 478        if !self.loading_worktrees.contains_key(&abs_path) {
 479            let task = match &self.state {
 480                WorktreeStoreState::Remote {
 481                    upstream_client,
 482                    path_style,
 483                    ..
 484                } => {
 485                    if upstream_client.is_via_collab() {
 486                        Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
 487                    } else {
 488                        let abs_path = RemotePathBuf::new(abs_path.to_string(), *path_style);
 489                        self.create_remote_worktree(upstream_client.clone(), abs_path, visible, cx)
 490                    }
 491                }
 492                WorktreeStoreState::Local { fs } => {
 493                    self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
 494                }
 495            };
 496
 497            self.loading_worktrees
 498                .insert(abs_path.clone(), task.shared());
 499        }
 500        let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
 501        cx.spawn(async move |this, cx| {
 502            let result = task.await;
 503            this.update(cx, |this, _| this.loading_worktrees.remove(&abs_path))
 504                .ok();
 505            match result {
 506                Ok(worktree) => {
 507                    if !is_via_collab {
 508                        if let Some((trusted_worktrees, worktree_store)) = this
 509                            .update(cx, |_, cx| {
 510                                TrustedWorktrees::try_get_global(cx).zip(Some(cx.entity()))
 511                            })
 512                            .ok()
 513                            .flatten()
 514                        {
 515                            trusted_worktrees
 516                                .update(cx, |trusted_worktrees, cx| {
 517                                    trusted_worktrees.can_trust(
 518                                        &worktree_store,
 519                                        worktree.read(cx).id(),
 520                                        cx,
 521                                    );
 522                                })
 523                                .ok();
 524                        }
 525                    }
 526                    Ok(worktree)
 527                }
 528                Err(err) => Err((*err).cloned()),
 529            }
 530        })
 531    }
 532
 533    fn create_remote_worktree(
 534        &mut self,
 535        client: AnyProtoClient,
 536        abs_path: RemotePathBuf,
 537        visible: bool,
 538        cx: &mut Context<Self>,
 539    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
 540        let path_style = abs_path.path_style();
 541        let mut abs_path = abs_path.to_string();
 542        // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
 543        // in which case want to strip the leading the `/`.
 544        // On the host-side, the `~` will get expanded.
 545        // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
 546        if abs_path.starts_with("/~") {
 547            abs_path = abs_path[1..].to_string();
 548        }
 549        if abs_path.is_empty() {
 550            abs_path = "~/".to_string();
 551        }
 552
 553        cx.spawn(async move |this, cx| {
 554            let this = this.upgrade().context("Dropped worktree store")?;
 555
 556            let path = RemotePathBuf::new(abs_path, path_style);
 557            let response = client
 558                .request(proto::AddWorktree {
 559                    project_id: REMOTE_SERVER_PROJECT_ID,
 560                    path: path.to_proto(),
 561                    visible,
 562                })
 563                .await?;
 564
 565            if let Some(existing_worktree) = this.read_with(cx, |this, cx| {
 566                this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
 567            })? {
 568                return Ok(existing_worktree);
 569            }
 570
 571            let root_path_buf = PathBuf::from(response.canonicalized_path.clone());
 572            let root_name = root_path_buf
 573                .file_name()
 574                .map(|n| n.to_string_lossy().into_owned())
 575                .unwrap_or(root_path_buf.to_string_lossy().into_owned());
 576
 577            let worktree = cx.update(|cx| {
 578                Worktree::remote(
 579                    REMOTE_SERVER_PROJECT_ID,
 580                    ReplicaId::REMOTE_SERVER,
 581                    proto::WorktreeMetadata {
 582                        id: response.worktree_id,
 583                        root_name,
 584                        visible,
 585                        abs_path: response.canonicalized_path,
 586                    },
 587                    client,
 588                    path_style,
 589                    cx,
 590                )
 591            })?;
 592
 593            this.update(cx, |this, cx| {
 594                this.add(&worktree, cx);
 595            })?;
 596            Ok(worktree)
 597        })
 598    }
 599
 600    fn create_local_worktree(
 601        &mut self,
 602        fs: Arc<dyn Fs>,
 603        abs_path: Arc<SanitizedPath>,
 604        visible: bool,
 605        cx: &mut Context<Self>,
 606    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
 607        let next_entry_id = self.next_entry_id.clone();
 608        let scanning_enabled = self.scanning_enabled;
 609
 610        cx.spawn(async move |this, cx| {
 611            let worktree = Worktree::local(
 612                SanitizedPath::cast_arc(abs_path.clone()),
 613                visible,
 614                fs,
 615                next_entry_id,
 616                scanning_enabled,
 617                cx,
 618            )
 619            .await;
 620
 621            let worktree = worktree?;
 622
 623            this.update(cx, |this, cx| this.add(&worktree, cx))?;
 624
 625            if visible {
 626                cx.update(|cx| {
 627                    cx.add_recent_document(abs_path.as_path());
 628                })
 629                .log_err();
 630            }
 631
 632            Ok(worktree)
 633        })
 634    }
 635
 636    pub fn add(&mut self, worktree: &Entity<Worktree>, cx: &mut Context<Self>) {
 637        let worktree_id = worktree.read(cx).id();
 638        debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
 639
 640        let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
 641        let handle = if push_strong_handle {
 642            WorktreeHandle::Strong(worktree.clone())
 643        } else {
 644            WorktreeHandle::Weak(worktree.downgrade())
 645        };
 646        if self.worktrees_reordered {
 647            self.worktrees.push(handle);
 648        } else {
 649            let i = match self
 650                .worktrees
 651                .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
 652                    other.upgrade().map(|worktree| worktree.read(cx).abs_path())
 653                }) {
 654                Ok(i) | Err(i) => i,
 655            };
 656            self.worktrees.insert(i, handle);
 657        }
 658
 659        cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
 660        self.send_project_updates(cx);
 661
 662        let handle_id = worktree.entity_id();
 663        cx.subscribe(worktree, |_, worktree, event, cx| {
 664            let worktree_id = worktree.read(cx).id();
 665            match event {
 666                worktree::Event::UpdatedEntries(changes) => {
 667                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedEntries(
 668                        worktree_id,
 669                        changes.clone(),
 670                    ));
 671                }
 672                worktree::Event::UpdatedGitRepositories(set) => {
 673                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedGitRepositories(
 674                        worktree_id,
 675                        set.clone(),
 676                    ));
 677                }
 678                worktree::Event::DeletedEntry(id) => {
 679                    cx.emit(WorktreeStoreEvent::WorktreeDeletedEntry(worktree_id, *id))
 680                }
 681            }
 682        })
 683        .detach();
 684        cx.observe_release(worktree, move |this, worktree, cx| {
 685            cx.emit(WorktreeStoreEvent::WorktreeReleased(
 686                handle_id,
 687                worktree.id(),
 688            ));
 689            cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 690                handle_id,
 691                worktree.id(),
 692            ));
 693            this.send_project_updates(cx);
 694        })
 695        .detach();
 696    }
 697
 698    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut Context<Self>) {
 699        self.worktrees.retain(|worktree| {
 700            if let Some(worktree) = worktree.upgrade() {
 701                if worktree.read(cx).id() == id_to_remove {
 702                    cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 703                        worktree.entity_id(),
 704                        id_to_remove,
 705                    ));
 706                    false
 707                } else {
 708                    true
 709                }
 710            } else {
 711                false
 712            }
 713        });
 714        self.send_project_updates(cx);
 715    }
 716
 717    pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
 718        self.worktrees_reordered = worktrees_reordered;
 719    }
 720
 721    fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
 722        match &self.state {
 723            WorktreeStoreState::Remote {
 724                upstream_client,
 725                upstream_project_id,
 726                ..
 727            } => Some((upstream_client.clone(), *upstream_project_id)),
 728            WorktreeStoreState::Local { .. } => None,
 729        }
 730    }
 731
 732    pub fn set_worktrees_from_proto(
 733        &mut self,
 734        worktrees: Vec<proto::WorktreeMetadata>,
 735        replica_id: ReplicaId,
 736        cx: &mut Context<Self>,
 737    ) -> Result<()> {
 738        let mut old_worktrees_by_id = self
 739            .worktrees
 740            .drain(..)
 741            .filter_map(|worktree| {
 742                let worktree = worktree.upgrade()?;
 743                Some((worktree.read(cx).id(), worktree))
 744            })
 745            .collect::<HashMap<_, _>>();
 746
 747        let (client, project_id) = self.upstream_client().context("invalid project")?;
 748
 749        for worktree in worktrees {
 750            if let Some(old_worktree) =
 751                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
 752            {
 753                let push_strong_handle =
 754                    self.retain_worktrees || old_worktree.read(cx).is_visible();
 755                let handle = if push_strong_handle {
 756                    WorktreeHandle::Strong(old_worktree.clone())
 757                } else {
 758                    WorktreeHandle::Weak(old_worktree.downgrade())
 759                };
 760                self.worktrees.push(handle);
 761            } else {
 762                self.add(
 763                    &Worktree::remote(
 764                        project_id,
 765                        replica_id,
 766                        worktree,
 767                        client.clone(),
 768                        self.path_style(),
 769                        cx,
 770                    ),
 771                    cx,
 772                );
 773            }
 774        }
 775        self.send_project_updates(cx);
 776
 777        Ok(())
 778    }
 779
 780    pub fn move_worktree(
 781        &mut self,
 782        source: WorktreeId,
 783        destination: WorktreeId,
 784        cx: &mut Context<Self>,
 785    ) -> Result<()> {
 786        if source == destination {
 787            return Ok(());
 788        }
 789
 790        let mut source_index = None;
 791        let mut destination_index = None;
 792        for (i, worktree) in self.worktrees.iter().enumerate() {
 793            if let Some(worktree) = worktree.upgrade() {
 794                let worktree_id = worktree.read(cx).id();
 795                if worktree_id == source {
 796                    source_index = Some(i);
 797                    if destination_index.is_some() {
 798                        break;
 799                    }
 800                } else if worktree_id == destination {
 801                    destination_index = Some(i);
 802                    if source_index.is_some() {
 803                        break;
 804                    }
 805                }
 806            }
 807        }
 808
 809        let source_index =
 810            source_index.with_context(|| format!("Missing worktree for id {source}"))?;
 811        let destination_index =
 812            destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
 813
 814        if source_index == destination_index {
 815            return Ok(());
 816        }
 817
 818        let worktree_to_move = self.worktrees.remove(source_index);
 819        self.worktrees.insert(destination_index, worktree_to_move);
 820        self.worktrees_reordered = true;
 821        cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
 822        cx.notify();
 823        Ok(())
 824    }
 825
 826    pub fn disconnected_from_host(&mut self, cx: &mut App) {
 827        for worktree in &self.worktrees {
 828            if let Some(worktree) = worktree.upgrade() {
 829                worktree.update(cx, |worktree, _| {
 830                    if let Some(worktree) = worktree.as_remote_mut() {
 831                        worktree.disconnected_from_host();
 832                    }
 833                });
 834            }
 835        }
 836    }
 837
 838    pub fn send_project_updates(&mut self, cx: &mut Context<Self>) {
 839        let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
 840            return;
 841        };
 842
 843        let update = proto::UpdateProject {
 844            project_id,
 845            worktrees: self.worktree_metadata_protos(cx),
 846        };
 847
 848        // collab has bad concurrency guarantees, so we send requests in serial.
 849        let update_project = if downstream_client.is_via_collab() {
 850            Some(downstream_client.request(update))
 851        } else {
 852            downstream_client.send(update).log_err();
 853            None
 854        };
 855        cx.spawn(async move |this, cx| {
 856            if let Some(update_project) = update_project {
 857                update_project.await?;
 858            }
 859
 860            this.update(cx, |this, cx| {
 861                let worktrees = this.worktrees().collect::<Vec<_>>();
 862
 863                for worktree in worktrees {
 864                    worktree.update(cx, |worktree, cx| {
 865                        let client = downstream_client.clone();
 866                        worktree.observe_updates(project_id, cx, {
 867                            move |update| {
 868                                let client = client.clone();
 869                                async move {
 870                                    if client.is_via_collab() {
 871                                        client
 872                                            .request(update)
 873                                            .map(|result| result.log_err().is_some())
 874                                            .await
 875                                    } else {
 876                                        client.send(update).log_err().is_some()
 877                                    }
 878                                }
 879                            }
 880                        });
 881                    });
 882
 883                    cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
 884                }
 885
 886                anyhow::Ok(())
 887            })
 888        })
 889        .detach_and_log_err(cx);
 890    }
 891
 892    pub fn worktree_metadata_protos(&self, cx: &App) -> Vec<proto::WorktreeMetadata> {
 893        self.worktrees()
 894            .map(|worktree| {
 895                let worktree = worktree.read(cx);
 896                proto::WorktreeMetadata {
 897                    id: worktree.id().to_proto(),
 898                    root_name: worktree.root_name_str().to_owned(),
 899                    visible: worktree.is_visible(),
 900                    abs_path: worktree.abs_path().to_string_lossy().into_owned(),
 901                }
 902            })
 903            .collect()
 904    }
 905
 906    pub fn shared(
 907        &mut self,
 908        remote_id: u64,
 909        downstream_client: AnyProtoClient,
 910        cx: &mut Context<Self>,
 911    ) {
 912        self.retain_worktrees = true;
 913        self.downstream_client = Some((downstream_client, remote_id));
 914
 915        // When shared, retain all worktrees
 916        for worktree_handle in self.worktrees.iter_mut() {
 917            match worktree_handle {
 918                WorktreeHandle::Strong(_) => {}
 919                WorktreeHandle::Weak(worktree) => {
 920                    if let Some(worktree) = worktree.upgrade() {
 921                        *worktree_handle = WorktreeHandle::Strong(worktree);
 922                    }
 923                }
 924            }
 925        }
 926        self.send_project_updates(cx);
 927    }
 928
 929    pub fn unshared(&mut self, cx: &mut Context<Self>) {
 930        self.retain_worktrees = false;
 931        self.downstream_client.take();
 932
 933        // When not shared, only retain the visible worktrees
 934        for worktree_handle in self.worktrees.iter_mut() {
 935            if let WorktreeHandle::Strong(worktree) = worktree_handle {
 936                let is_visible = worktree.update(cx, |worktree, _| {
 937                    worktree.stop_observing_updates();
 938                    worktree.is_visible()
 939                });
 940                if !is_visible {
 941                    *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
 942                }
 943            }
 944        }
 945    }
 946
 947    /// search over all worktrees and return buffers that *might* match the search.
 948    pub fn find_search_candidates(
 949        &self,
 950        query: SearchQuery,
 951        limit: usize,
 952        open_entries: HashSet<ProjectEntryId>,
 953        fs: Arc<dyn Fs>,
 954        cx: &Context<Self>,
 955    ) -> Receiver<ProjectPath> {
 956        let snapshots = self
 957            .visible_worktrees(cx)
 958            .filter_map(|tree| {
 959                let tree = tree.read(cx);
 960                Some((tree.snapshot(), tree.as_local()?.settings()))
 961            })
 962            .collect::<Vec<_>>();
 963
 964        let executor = cx.background_executor().clone();
 965
 966        // We want to return entries in the order they are in the worktrees, so we have one
 967        // thread that iterates over the worktrees (and ignored directories) as necessary,
 968        // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
 969        // channel.
 970        // We spawn a number of workers that take items from the filter channel and check the query
 971        // against the version of the file on disk.
 972        let (filter_tx, filter_rx) = smol::channel::bounded(64);
 973        let (output_tx, output_rx) = smol::channel::bounded(64);
 974        let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
 975
 976        let input = cx.background_spawn({
 977            let fs = fs.clone();
 978            let query = query.clone();
 979            async move {
 980                Self::find_candidate_paths(
 981                    fs,
 982                    snapshots,
 983                    open_entries,
 984                    query,
 985                    filter_tx,
 986                    output_tx,
 987                )
 988                .await
 989                .log_err();
 990            }
 991        });
 992        const MAX_CONCURRENT_FILE_SCANS: usize = 64;
 993        let filters = cx.background_spawn(async move {
 994            let fs = &fs;
 995            let query = &query;
 996            executor
 997                .scoped(move |scope| {
 998                    for _ in 0..MAX_CONCURRENT_FILE_SCANS {
 999                        let filter_rx = filter_rx.clone();
1000                        scope.spawn(async move {
1001                            Self::filter_paths(fs, filter_rx, query)
1002                                .await
1003                                .log_with_level(log::Level::Debug);
1004                        })
1005                    }
1006                })
1007                .await;
1008        });
1009        cx.background_spawn(async move {
1010            let mut matched = 0;
1011            while let Ok(mut receiver) = output_rx.recv().await {
1012                let Some(path) = receiver.next().await else {
1013                    continue;
1014                };
1015                let Ok(_) = matching_paths_tx.send(path).await else {
1016                    break;
1017                };
1018                matched += 1;
1019                if matched == limit {
1020                    break;
1021                }
1022            }
1023            drop(input);
1024            drop(filters);
1025        })
1026        .detach();
1027        matching_paths_rx
1028    }
1029
1030    async fn find_candidate_paths(
1031        _: Arc<dyn Fs>,
1032        _: Vec<(worktree::Snapshot, WorktreeSettings)>,
1033        _: HashSet<ProjectEntryId>,
1034        _: SearchQuery,
1035        _: Sender<MatchingEntry>,
1036        _: Sender<oneshot::Receiver<ProjectPath>>,
1037    ) -> Result<()> {
1038        Ok(())
1039    }
1040
1041    async fn filter_paths(
1042        fs: &Arc<dyn Fs>,
1043        input: Receiver<MatchingEntry>,
1044        query: &SearchQuery,
1045    ) -> Result<()> {
1046        let mut input = pin!(input);
1047        while let Some(mut entry) = input.next().await {
1048            let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
1049            let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
1050                continue;
1051            };
1052
1053            let mut file = BufReader::new(file);
1054            let file_start = file.fill_buf()?;
1055
1056            if let Err(Some(starting_position)) =
1057                std::str::from_utf8(file_start).map_err(|e| e.error_len())
1058            {
1059                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
1060                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
1061                log::debug!(
1062                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
1063                );
1064                continue;
1065            }
1066
1067            if query.detect(file).unwrap_or(false) {
1068                entry.respond.send(entry.path).await?
1069            }
1070        }
1071
1072        Ok(())
1073    }
1074
1075    pub async fn handle_create_project_entry(
1076        this: Entity<Self>,
1077        envelope: TypedEnvelope<proto::CreateProjectEntry>,
1078        mut cx: AsyncApp,
1079    ) -> Result<proto::ProjectEntryResponse> {
1080        let worktree = this.update(&mut cx, |this, cx| {
1081            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
1082            this.worktree_for_id(worktree_id, cx)
1083                .context("worktree not found")
1084        })??;
1085        Worktree::handle_create_entry(worktree, envelope.payload, cx).await
1086    }
1087
1088    pub async fn handle_copy_project_entry(
1089        this: Entity<Self>,
1090        envelope: TypedEnvelope<proto::CopyProjectEntry>,
1091        mut cx: AsyncApp,
1092    ) -> Result<proto::ProjectEntryResponse> {
1093        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1094        let new_worktree_id = WorktreeId::from_proto(envelope.payload.new_worktree_id);
1095        let new_project_path = (
1096            new_worktree_id,
1097            RelPath::from_proto(&envelope.payload.new_path)?,
1098        );
1099        let (scan_id, entry) = this.update(&mut cx, |this, cx| {
1100            let Some((_, project_id)) = this.downstream_client else {
1101                bail!("no downstream client")
1102            };
1103            let Some(entry) = this.entry_for_id(entry_id, cx) else {
1104                bail!("no such entry");
1105            };
1106            if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1107                bail!("entry is private")
1108            }
1109
1110            let new_worktree = this
1111                .worktree_for_id(new_worktree_id, cx)
1112                .context("no such worktree")?;
1113            let scan_id = new_worktree.read(cx).scan_id();
1114            anyhow::Ok((
1115                scan_id,
1116                this.copy_entry(entry_id, new_project_path.into(), cx),
1117            ))
1118        })??;
1119        let entry = entry.await?;
1120        Ok(proto::ProjectEntryResponse {
1121            entry: entry.as_ref().map(|entry| entry.into()),
1122            worktree_scan_id: scan_id as u64,
1123        })
1124    }
1125
1126    pub async fn handle_delete_project_entry(
1127        this: Entity<Self>,
1128        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
1129        mut cx: AsyncApp,
1130    ) -> Result<proto::ProjectEntryResponse> {
1131        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1132        let worktree = this.update(&mut cx, |this, cx| {
1133            let Some((_, project_id)) = this.downstream_client else {
1134                bail!("no downstream client")
1135            };
1136            let Some(entry) = this.entry_for_id(entry_id, cx) else {
1137                bail!("no entry")
1138            };
1139            if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1140                bail!("entry is private")
1141            }
1142            this.worktree_for_entry(entry_id, cx)
1143                .context("worktree not found")
1144        })??;
1145        Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
1146    }
1147
1148    pub async fn handle_rename_project_entry(
1149        this: Entity<Self>,
1150        request: proto::RenameProjectEntry,
1151        mut cx: AsyncApp,
1152    ) -> Result<proto::ProjectEntryResponse> {
1153        let entry_id = ProjectEntryId::from_proto(request.entry_id);
1154        let new_worktree_id = WorktreeId::from_proto(request.new_worktree_id);
1155        let rel_path = RelPath::from_proto(&request.new_path)
1156            .with_context(|| format!("received invalid relative path {:?}", &request.new_path))?;
1157
1158        let (scan_id, task) = this.update(&mut cx, |this, cx| {
1159            let worktree = this
1160                .worktree_for_entry(entry_id, cx)
1161                .context("no such worktree")?;
1162
1163            let Some((_, project_id)) = this.downstream_client else {
1164                bail!("no downstream client")
1165            };
1166            let entry = worktree
1167                .read(cx)
1168                .entry_for_id(entry_id)
1169                .ok_or_else(|| anyhow!("missing entry"))?;
1170            if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1171                bail!("entry is private")
1172            }
1173
1174            let scan_id = worktree.read(cx).scan_id();
1175            anyhow::Ok((
1176                scan_id,
1177                this.rename_entry(entry_id, (new_worktree_id, rel_path).into(), cx),
1178            ))
1179        })??;
1180        Ok(proto::ProjectEntryResponse {
1181            entry: match &task.await? {
1182                CreatedEntry::Included(entry) => Some(entry.into()),
1183                CreatedEntry::Excluded { .. } => None,
1184            },
1185            worktree_scan_id: scan_id as u64,
1186        })
1187    }
1188
1189    pub async fn handle_expand_project_entry(
1190        this: Entity<Self>,
1191        envelope: TypedEnvelope<proto::ExpandProjectEntry>,
1192        mut cx: AsyncApp,
1193    ) -> Result<proto::ExpandProjectEntryResponse> {
1194        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1195        let worktree = this
1196            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1197            .context("invalid request")?;
1198        Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
1199    }
1200
1201    pub async fn handle_expand_all_for_project_entry(
1202        this: Entity<Self>,
1203        envelope: TypedEnvelope<proto::ExpandAllForProjectEntry>,
1204        mut cx: AsyncApp,
1205    ) -> Result<proto::ExpandAllForProjectEntryResponse> {
1206        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1207        let worktree = this
1208            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1209            .context("invalid request")?;
1210        Worktree::handle_expand_all_for_entry(worktree, envelope.payload, cx).await
1211    }
1212
1213    pub fn fs(&self) -> Option<Arc<dyn Fs>> {
1214        match &self.state {
1215            WorktreeStoreState::Local { fs } => Some(fs.clone()),
1216            WorktreeStoreState::Remote { .. } => None,
1217        }
1218    }
1219}
1220
1221#[derive(Clone, Debug)]
1222enum WorktreeHandle {
1223    Strong(Entity<Worktree>),
1224    Weak(WeakEntity<Worktree>),
1225}
1226
1227impl WorktreeHandle {
1228    fn upgrade(&self) -> Option<Entity<Worktree>> {
1229        match self {
1230            WorktreeHandle::Strong(handle) => Some(handle.clone()),
1231            WorktreeHandle::Weak(handle) => handle.upgrade(),
1232        }
1233    }
1234}