worktree_store.rs

   1use std::{
   2    path::{Path, PathBuf},
   3    sync::{atomic::AtomicUsize, Arc},
   4};
   5
   6use anyhow::{anyhow, Context as _, Result};
   7use collections::{HashMap, HashSet};
   8use fs::Fs;
   9use futures::{
  10    future::{BoxFuture, Shared},
  11    FutureExt, SinkExt,
  12};
  13use gpui::{
  14    AppContext, AsyncAppContext, EntityId, EventEmitter, Model, ModelContext, Task, WeakModel,
  15};
  16use postage::oneshot;
  17use rpc::{
  18    proto::{self, SSH_PROJECT_ID},
  19    AnyProtoClient, ErrorExt, TypedEnvelope,
  20};
  21use smol::{
  22    channel::{Receiver, Sender},
  23    stream::StreamExt,
  24};
  25use text::ReplicaId;
  26use util::{paths::SanitizedPath, ResultExt};
  27use worktree::{Entry, ProjectEntryId, Worktree, WorktreeId, WorktreeSettings};
  28
  29use crate::{search::SearchQuery, LspStore, ProjectPath};
  30
  31struct MatchingEntry {
  32    worktree_path: Arc<Path>,
  33    path: ProjectPath,
  34    respond: oneshot::Sender<ProjectPath>,
  35}
  36
  37enum WorktreeStoreState {
  38    Local {
  39        fs: Arc<dyn Fs>,
  40    },
  41    Remote {
  42        upstream_client: AnyProtoClient,
  43        upstream_project_id: u64,
  44    },
  45}
  46
  47pub struct WorktreeStore {
  48    next_entry_id: Arc<AtomicUsize>,
  49    downstream_client: Option<(AnyProtoClient, u64)>,
  50    retain_worktrees: bool,
  51    worktrees: Vec<WorktreeHandle>,
  52    worktrees_reordered: bool,
  53    #[allow(clippy::type_complexity)]
  54    loading_worktrees:
  55        HashMap<SanitizedPath, Shared<Task<Result<Model<Worktree>, Arc<anyhow::Error>>>>>,
  56    state: WorktreeStoreState,
  57}
  58
  59pub enum WorktreeStoreEvent {
  60    WorktreeAdded(Model<Worktree>),
  61    WorktreeRemoved(EntityId, WorktreeId),
  62    WorktreeReleased(EntityId, WorktreeId),
  63    WorktreeOrderChanged,
  64    WorktreeUpdateSent(Model<Worktree>),
  65}
  66
  67impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
  68
  69impl WorktreeStore {
  70    pub fn init(client: &AnyProtoClient) {
  71        client.add_model_request_handler(Self::handle_create_project_entry);
  72        client.add_model_request_handler(Self::handle_copy_project_entry);
  73        client.add_model_request_handler(Self::handle_delete_project_entry);
  74        client.add_model_request_handler(Self::handle_expand_project_entry);
  75        client.add_model_request_handler(Self::handle_git_branches);
  76        client.add_model_request_handler(Self::handle_update_branch);
  77    }
  78
  79    pub fn local(retain_worktrees: bool, fs: Arc<dyn Fs>) -> Self {
  80        Self {
  81            next_entry_id: Default::default(),
  82            loading_worktrees: Default::default(),
  83            downstream_client: None,
  84            worktrees: Vec::new(),
  85            worktrees_reordered: false,
  86            retain_worktrees,
  87            state: WorktreeStoreState::Local { fs },
  88        }
  89    }
  90
  91    pub fn remote(
  92        retain_worktrees: bool,
  93        upstream_client: AnyProtoClient,
  94        upstream_project_id: u64,
  95    ) -> Self {
  96        Self {
  97            next_entry_id: Default::default(),
  98            loading_worktrees: Default::default(),
  99            downstream_client: None,
 100            worktrees: Vec::new(),
 101            worktrees_reordered: false,
 102            retain_worktrees,
 103            state: WorktreeStoreState::Remote {
 104                upstream_client,
 105                upstream_project_id,
 106            },
 107        }
 108    }
 109
 110    /// Iterates through all worktrees, including ones that don't appear in the project panel
 111    pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Model<Worktree>> {
 112        self.worktrees
 113            .iter()
 114            .filter_map(move |worktree| worktree.upgrade())
 115    }
 116
 117    /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
 118    pub fn visible_worktrees<'a>(
 119        &'a self,
 120        cx: &'a AppContext,
 121    ) -> impl 'a + DoubleEndedIterator<Item = Model<Worktree>> {
 122        self.worktrees()
 123            .filter(|worktree| worktree.read(cx).is_visible())
 124    }
 125
 126    pub fn worktree_for_id(&self, id: WorktreeId, cx: &AppContext) -> Option<Model<Worktree>> {
 127        self.worktrees()
 128            .find(|worktree| worktree.read(cx).id() == id)
 129    }
 130
 131    pub fn current_branch(&self, repository: ProjectPath, cx: &AppContext) -> Option<Arc<str>> {
 132        self.worktree_for_id(repository.worktree_id, cx)?
 133            .read(cx)
 134            .git_entry(repository.path)?
 135            .branch()
 136    }
 137
 138    pub fn worktree_for_entry(
 139        &self,
 140        entry_id: ProjectEntryId,
 141        cx: &AppContext,
 142    ) -> Option<Model<Worktree>> {
 143        self.worktrees()
 144            .find(|worktree| worktree.read(cx).contains_entry(entry_id))
 145    }
 146
 147    pub fn find_worktree(
 148        &self,
 149        abs_path: impl Into<SanitizedPath>,
 150        cx: &AppContext,
 151    ) -> Option<(Model<Worktree>, PathBuf)> {
 152        let abs_path: SanitizedPath = abs_path.into();
 153        for tree in self.worktrees() {
 154            if let Ok(relative_path) = abs_path.as_path().strip_prefix(tree.read(cx).abs_path()) {
 155                return Some((tree.clone(), relative_path.into()));
 156            }
 157        }
 158        None
 159    }
 160
 161    pub fn find_or_create_worktree(
 162        &mut self,
 163        abs_path: impl AsRef<Path>,
 164        visible: bool,
 165        cx: &mut ModelContext<Self>,
 166    ) -> Task<Result<(Model<Worktree>, PathBuf)>> {
 167        let abs_path = abs_path.as_ref();
 168        if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
 169            Task::ready(Ok((tree, relative_path)))
 170        } else {
 171            let worktree = self.create_worktree(abs_path, visible, cx);
 172            cx.background_executor()
 173                .spawn(async move { Ok((worktree.await?, PathBuf::new())) })
 174        }
 175    }
 176
 177    pub fn entry_for_id<'a>(
 178        &'a self,
 179        entry_id: ProjectEntryId,
 180        cx: &'a AppContext,
 181    ) -> Option<&'a Entry> {
 182        self.worktrees()
 183            .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
 184    }
 185
 186    pub fn worktree_and_entry_for_id<'a>(
 187        &'a self,
 188        entry_id: ProjectEntryId,
 189        cx: &'a AppContext,
 190    ) -> Option<(Model<Worktree>, &'a Entry)> {
 191        self.worktrees().find_map(|worktree| {
 192            worktree
 193                .read(cx)
 194                .entry_for_id(entry_id)
 195                .map(|e| (worktree.clone(), e))
 196        })
 197    }
 198
 199    pub fn entry_for_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Entry> {
 200        self.worktree_for_id(path.worktree_id, cx)?
 201            .read(cx)
 202            .entry_for_path(&path.path)
 203            .cloned()
 204    }
 205
 206    pub fn create_worktree(
 207        &mut self,
 208        abs_path: impl Into<SanitizedPath>,
 209        visible: bool,
 210        cx: &mut ModelContext<Self>,
 211    ) -> Task<Result<Model<Worktree>>> {
 212        let abs_path: SanitizedPath = abs_path.into();
 213        if !self.loading_worktrees.contains_key(&abs_path) {
 214            let task = match &self.state {
 215                WorktreeStoreState::Remote {
 216                    upstream_client, ..
 217                } => {
 218                    if upstream_client.is_via_collab() {
 219                        Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
 220                    } else {
 221                        self.create_ssh_worktree(
 222                            upstream_client.clone(),
 223                            abs_path.clone(),
 224                            visible,
 225                            cx,
 226                        )
 227                    }
 228                }
 229                WorktreeStoreState::Local { fs } => {
 230                    self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
 231                }
 232            };
 233
 234            self.loading_worktrees
 235                .insert(abs_path.clone(), task.shared());
 236        }
 237        let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
 238        cx.spawn(|this, mut cx| async move {
 239            let result = task.await;
 240            this.update(&mut cx, |this, _| this.loading_worktrees.remove(&abs_path))
 241                .ok();
 242            match result {
 243                Ok(worktree) => Ok(worktree),
 244                Err(err) => Err((*err).cloned()),
 245            }
 246        })
 247    }
 248
 249    fn create_ssh_worktree(
 250        &mut self,
 251        client: AnyProtoClient,
 252        abs_path: impl Into<SanitizedPath>,
 253        visible: bool,
 254        cx: &mut ModelContext<Self>,
 255    ) -> Task<Result<Model<Worktree>, Arc<anyhow::Error>>> {
 256        let mut abs_path = Into::<SanitizedPath>::into(abs_path).to_string();
 257        // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
 258        // in which case want to strip the leading the `/`.
 259        // On the host-side, the `~` will get expanded.
 260        // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
 261        if abs_path.starts_with("/~") {
 262            abs_path = abs_path[1..].to_string();
 263        }
 264        if abs_path.is_empty() || abs_path == "/" {
 265            abs_path = "~/".to_string();
 266        }
 267        cx.spawn(|this, mut cx| async move {
 268            let this = this.upgrade().context("Dropped worktree store")?;
 269
 270            let response = client
 271                .request(proto::AddWorktree {
 272                    project_id: SSH_PROJECT_ID,
 273                    path: abs_path.clone(),
 274                    visible,
 275                })
 276                .await?;
 277
 278            if let Some(existing_worktree) = this.read_with(&cx, |this, cx| {
 279                this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
 280            })? {
 281                return Ok(existing_worktree);
 282            }
 283
 284            let root_name = PathBuf::from(&response.canonicalized_path)
 285                .file_name()
 286                .map(|n| n.to_string_lossy().to_string())
 287                .unwrap_or(response.canonicalized_path.to_string());
 288
 289            let worktree = cx.update(|cx| {
 290                Worktree::remote(
 291                    SSH_PROJECT_ID,
 292                    0,
 293                    proto::WorktreeMetadata {
 294                        id: response.worktree_id,
 295                        root_name,
 296                        visible,
 297                        abs_path: response.canonicalized_path,
 298                    },
 299                    client,
 300                    cx,
 301                )
 302            })?;
 303
 304            this.update(&mut cx, |this, cx| {
 305                this.add(&worktree, cx);
 306            })?;
 307            Ok(worktree)
 308        })
 309    }
 310
 311    fn create_local_worktree(
 312        &mut self,
 313        fs: Arc<dyn Fs>,
 314        abs_path: impl Into<SanitizedPath>,
 315        visible: bool,
 316        cx: &mut ModelContext<Self>,
 317    ) -> Task<Result<Model<Worktree>, Arc<anyhow::Error>>> {
 318        let next_entry_id = self.next_entry_id.clone();
 319        let path: SanitizedPath = abs_path.into();
 320
 321        cx.spawn(move |this, mut cx| async move {
 322            let worktree = Worktree::local(path.clone(), visible, fs, next_entry_id, &mut cx).await;
 323
 324            let worktree = worktree?;
 325            this.update(&mut cx, |this, cx| this.add(&worktree, cx))?;
 326
 327            if visible {
 328                cx.update(|cx| {
 329                    cx.add_recent_document(path.as_path());
 330                })
 331                .log_err();
 332            }
 333
 334            Ok(worktree)
 335        })
 336    }
 337
 338    pub fn add(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
 339        let worktree_id = worktree.read(cx).id();
 340        debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
 341
 342        let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
 343        let handle = if push_strong_handle {
 344            WorktreeHandle::Strong(worktree.clone())
 345        } else {
 346            WorktreeHandle::Weak(worktree.downgrade())
 347        };
 348        if self.worktrees_reordered {
 349            self.worktrees.push(handle);
 350        } else {
 351            let i = match self
 352                .worktrees
 353                .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
 354                    other.upgrade().map(|worktree| worktree.read(cx).abs_path())
 355                }) {
 356                Ok(i) | Err(i) => i,
 357            };
 358            self.worktrees.insert(i, handle);
 359        }
 360
 361        cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
 362        self.send_project_updates(cx);
 363
 364        let handle_id = worktree.entity_id();
 365        cx.observe_release(worktree, move |this, worktree, cx| {
 366            cx.emit(WorktreeStoreEvent::WorktreeReleased(
 367                handle_id,
 368                worktree.id(),
 369            ));
 370            cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 371                handle_id,
 372                worktree.id(),
 373            ));
 374            this.send_project_updates(cx);
 375        })
 376        .detach();
 377    }
 378
 379    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut ModelContext<Self>) {
 380        self.worktrees.retain(|worktree| {
 381            if let Some(worktree) = worktree.upgrade() {
 382                if worktree.read(cx).id() == id_to_remove {
 383                    cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 384                        worktree.entity_id(),
 385                        id_to_remove,
 386                    ));
 387                    false
 388                } else {
 389                    true
 390                }
 391            } else {
 392                false
 393            }
 394        });
 395        self.send_project_updates(cx);
 396    }
 397
 398    pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
 399        self.worktrees_reordered = worktrees_reordered;
 400    }
 401
 402    fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
 403        match &self.state {
 404            WorktreeStoreState::Remote {
 405                upstream_client,
 406                upstream_project_id,
 407                ..
 408            } => Some((upstream_client.clone(), *upstream_project_id)),
 409            WorktreeStoreState::Local { .. } => None,
 410        }
 411    }
 412
 413    pub fn set_worktrees_from_proto(
 414        &mut self,
 415        worktrees: Vec<proto::WorktreeMetadata>,
 416        replica_id: ReplicaId,
 417        cx: &mut ModelContext<Self>,
 418    ) -> Result<()> {
 419        let mut old_worktrees_by_id = self
 420            .worktrees
 421            .drain(..)
 422            .filter_map(|worktree| {
 423                let worktree = worktree.upgrade()?;
 424                Some((worktree.read(cx).id(), worktree))
 425            })
 426            .collect::<HashMap<_, _>>();
 427
 428        let (client, project_id) = self
 429            .upstream_client()
 430            .clone()
 431            .ok_or_else(|| anyhow!("invalid project"))?;
 432
 433        for worktree in worktrees {
 434            if let Some(old_worktree) =
 435                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
 436            {
 437                let push_strong_handle =
 438                    self.retain_worktrees || old_worktree.read(cx).is_visible();
 439                let handle = if push_strong_handle {
 440                    WorktreeHandle::Strong(old_worktree.clone())
 441                } else {
 442                    WorktreeHandle::Weak(old_worktree.downgrade())
 443                };
 444                self.worktrees.push(handle);
 445            } else {
 446                self.add(
 447                    &Worktree::remote(project_id, replica_id, worktree, client.clone(), cx),
 448                    cx,
 449                );
 450            }
 451        }
 452        self.send_project_updates(cx);
 453
 454        Ok(())
 455    }
 456
 457    pub fn move_worktree(
 458        &mut self,
 459        source: WorktreeId,
 460        destination: WorktreeId,
 461        cx: &mut ModelContext<Self>,
 462    ) -> Result<()> {
 463        if source == destination {
 464            return Ok(());
 465        }
 466
 467        let mut source_index = None;
 468        let mut destination_index = None;
 469        for (i, worktree) in self.worktrees.iter().enumerate() {
 470            if let Some(worktree) = worktree.upgrade() {
 471                let worktree_id = worktree.read(cx).id();
 472                if worktree_id == source {
 473                    source_index = Some(i);
 474                    if destination_index.is_some() {
 475                        break;
 476                    }
 477                } else if worktree_id == destination {
 478                    destination_index = Some(i);
 479                    if source_index.is_some() {
 480                        break;
 481                    }
 482                }
 483            }
 484        }
 485
 486        let source_index =
 487            source_index.with_context(|| format!("Missing worktree for id {source}"))?;
 488        let destination_index =
 489            destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
 490
 491        if source_index == destination_index {
 492            return Ok(());
 493        }
 494
 495        let worktree_to_move = self.worktrees.remove(source_index);
 496        self.worktrees.insert(destination_index, worktree_to_move);
 497        self.worktrees_reordered = true;
 498        cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
 499        cx.notify();
 500        Ok(())
 501    }
 502
 503    pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
 504        for worktree in &self.worktrees {
 505            if let Some(worktree) = worktree.upgrade() {
 506                worktree.update(cx, |worktree, _| {
 507                    if let Some(worktree) = worktree.as_remote_mut() {
 508                        worktree.disconnected_from_host();
 509                    }
 510                });
 511            }
 512        }
 513    }
 514
 515    pub fn send_project_updates(&mut self, cx: &mut ModelContext<Self>) {
 516        let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
 517            return;
 518        };
 519
 520        let update = proto::UpdateProject {
 521            project_id,
 522            worktrees: self.worktree_metadata_protos(cx),
 523        };
 524
 525        // collab has bad concurrency guarantees, so we send requests in serial.
 526        let update_project = if downstream_client.is_via_collab() {
 527            Some(downstream_client.request(update))
 528        } else {
 529            downstream_client.send(update).log_err();
 530            None
 531        };
 532        cx.spawn(|this, mut cx| async move {
 533            if let Some(update_project) = update_project {
 534                update_project.await?;
 535            }
 536
 537            this.update(&mut cx, |this, cx| {
 538                let worktrees = this.worktrees().collect::<Vec<_>>();
 539
 540                for worktree in worktrees {
 541                    worktree.update(cx, |worktree, cx| {
 542                        let client = downstream_client.clone();
 543                        worktree.observe_updates(project_id, cx, {
 544                            move |update| {
 545                                let client = client.clone();
 546                                async move {
 547                                    if client.is_via_collab() {
 548                                        client
 549                                            .request(update)
 550                                            .map(|result| result.log_err().is_some())
 551                                            .await
 552                                    } else {
 553                                        client.send(update).log_err().is_some()
 554                                    }
 555                                }
 556                            }
 557                        });
 558                    });
 559
 560                    cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
 561                }
 562
 563                anyhow::Ok(())
 564            })
 565        })
 566        .detach_and_log_err(cx);
 567    }
 568
 569    pub fn worktree_metadata_protos(&self, cx: &AppContext) -> Vec<proto::WorktreeMetadata> {
 570        self.worktrees()
 571            .map(|worktree| {
 572                let worktree = worktree.read(cx);
 573                proto::WorktreeMetadata {
 574                    id: worktree.id().to_proto(),
 575                    root_name: worktree.root_name().into(),
 576                    visible: worktree.is_visible(),
 577                    abs_path: worktree.abs_path().to_string_lossy().into(),
 578                }
 579            })
 580            .collect()
 581    }
 582
 583    pub fn shared(
 584        &mut self,
 585        remote_id: u64,
 586        downsteam_client: AnyProtoClient,
 587        cx: &mut ModelContext<Self>,
 588    ) {
 589        self.retain_worktrees = true;
 590        self.downstream_client = Some((downsteam_client, remote_id));
 591
 592        // When shared, retain all worktrees
 593        for worktree_handle in self.worktrees.iter_mut() {
 594            match worktree_handle {
 595                WorktreeHandle::Strong(_) => {}
 596                WorktreeHandle::Weak(worktree) => {
 597                    if let Some(worktree) = worktree.upgrade() {
 598                        *worktree_handle = WorktreeHandle::Strong(worktree);
 599                    }
 600                }
 601            }
 602        }
 603        self.send_project_updates(cx);
 604    }
 605
 606    pub fn unshared(&mut self, cx: &mut ModelContext<Self>) {
 607        self.retain_worktrees = false;
 608        self.downstream_client.take();
 609
 610        // When not shared, only retain the visible worktrees
 611        for worktree_handle in self.worktrees.iter_mut() {
 612            if let WorktreeHandle::Strong(worktree) = worktree_handle {
 613                let is_visible = worktree.update(cx, |worktree, _| {
 614                    worktree.stop_observing_updates();
 615                    worktree.is_visible()
 616                });
 617                if !is_visible {
 618                    *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
 619                }
 620            }
 621        }
 622    }
 623
 624    /// search over all worktrees and return buffers that *might* match the search.
 625    pub fn find_search_candidates(
 626        &self,
 627        query: SearchQuery,
 628        limit: usize,
 629        open_entries: HashSet<ProjectEntryId>,
 630        fs: Arc<dyn Fs>,
 631        cx: &ModelContext<Self>,
 632    ) -> Receiver<ProjectPath> {
 633        let snapshots = self
 634            .visible_worktrees(cx)
 635            .filter_map(|tree| {
 636                let tree = tree.read(cx);
 637                Some((tree.snapshot(), tree.as_local()?.settings()))
 638            })
 639            .collect::<Vec<_>>();
 640
 641        let executor = cx.background_executor().clone();
 642
 643        // We want to return entries in the order they are in the worktrees, so we have one
 644        // thread that iterates over the worktrees (and ignored directories) as necessary,
 645        // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
 646        // channel.
 647        // We spawn a number of workers that take items from the filter channel and check the query
 648        // against the version of the file on disk.
 649        let (filter_tx, filter_rx) = smol::channel::bounded(64);
 650        let (output_tx, mut output_rx) = smol::channel::bounded(64);
 651        let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
 652
 653        let input = cx.background_executor().spawn({
 654            let fs = fs.clone();
 655            let query = query.clone();
 656            async move {
 657                Self::find_candidate_paths(
 658                    fs,
 659                    snapshots,
 660                    open_entries,
 661                    query,
 662                    filter_tx,
 663                    output_tx,
 664                )
 665                .await
 666                .log_err();
 667            }
 668        });
 669        const MAX_CONCURRENT_FILE_SCANS: usize = 64;
 670        let filters = cx.background_executor().spawn(async move {
 671            let fs = &fs;
 672            let query = &query;
 673            executor
 674                .scoped(move |scope| {
 675                    for _ in 0..MAX_CONCURRENT_FILE_SCANS {
 676                        let filter_rx = filter_rx.clone();
 677                        scope.spawn(async move {
 678                            Self::filter_paths(fs, filter_rx, query).await.log_err();
 679                        })
 680                    }
 681                })
 682                .await;
 683        });
 684        cx.background_executor()
 685            .spawn(async move {
 686                let mut matched = 0;
 687                while let Some(mut receiver) = output_rx.next().await {
 688                    let Some(path) = receiver.next().await else {
 689                        continue;
 690                    };
 691                    let Ok(_) = matching_paths_tx.send(path).await else {
 692                        break;
 693                    };
 694                    matched += 1;
 695                    if matched == limit {
 696                        break;
 697                    }
 698                }
 699                drop(input);
 700                drop(filters);
 701            })
 702            .detach();
 703        matching_paths_rx
 704    }
 705
 706    fn scan_ignored_dir<'a>(
 707        fs: &'a Arc<dyn Fs>,
 708        snapshot: &'a worktree::Snapshot,
 709        path: &'a Path,
 710        query: &'a SearchQuery,
 711        include_root: bool,
 712        filter_tx: &'a Sender<MatchingEntry>,
 713        output_tx: &'a Sender<oneshot::Receiver<ProjectPath>>,
 714    ) -> BoxFuture<'a, Result<()>> {
 715        async move {
 716            let abs_path = snapshot.abs_path().join(path);
 717            let Some(mut files) = fs
 718                .read_dir(&abs_path)
 719                .await
 720                .with_context(|| format!("listing ignored path {abs_path:?}"))
 721                .log_err()
 722            else {
 723                return Ok(());
 724            };
 725
 726            let mut results = Vec::new();
 727
 728            while let Some(Ok(file)) = files.next().await {
 729                let Some(metadata) = fs
 730                    .metadata(&file)
 731                    .await
 732                    .with_context(|| format!("fetching fs metadata for {abs_path:?}"))
 733                    .log_err()
 734                    .flatten()
 735                else {
 736                    continue;
 737                };
 738                if metadata.is_symlink || metadata.is_fifo {
 739                    continue;
 740                }
 741                results.push((
 742                    file.strip_prefix(snapshot.abs_path())?.to_path_buf(),
 743                    !metadata.is_dir,
 744                ))
 745            }
 746            results.sort_by(|(a_path, _), (b_path, _)| a_path.cmp(b_path));
 747            for (path, is_file) in results {
 748                if is_file {
 749                    if query.filters_path() {
 750                        let matched_path = if include_root {
 751                            let mut full_path = PathBuf::from(snapshot.root_name());
 752                            full_path.push(&path);
 753                            query.file_matches(&full_path)
 754                        } else {
 755                            query.file_matches(&path)
 756                        };
 757                        if !matched_path {
 758                            continue;
 759                        }
 760                    }
 761                    let (tx, rx) = oneshot::channel();
 762                    output_tx.send(rx).await?;
 763                    filter_tx
 764                        .send(MatchingEntry {
 765                            respond: tx,
 766                            worktree_path: snapshot.abs_path().clone(),
 767                            path: ProjectPath {
 768                                worktree_id: snapshot.id(),
 769                                path: Arc::from(path),
 770                            },
 771                        })
 772                        .await?;
 773                } else {
 774                    Self::scan_ignored_dir(
 775                        fs,
 776                        snapshot,
 777                        &path,
 778                        query,
 779                        include_root,
 780                        filter_tx,
 781                        output_tx,
 782                    )
 783                    .await?;
 784                }
 785            }
 786            Ok(())
 787        }
 788        .boxed()
 789    }
 790
 791    async fn find_candidate_paths(
 792        fs: Arc<dyn Fs>,
 793        snapshots: Vec<(worktree::Snapshot, WorktreeSettings)>,
 794        open_entries: HashSet<ProjectEntryId>,
 795        query: SearchQuery,
 796        filter_tx: Sender<MatchingEntry>,
 797        output_tx: Sender<oneshot::Receiver<ProjectPath>>,
 798    ) -> Result<()> {
 799        let include_root = snapshots.len() > 1;
 800        for (snapshot, settings) in snapshots {
 801            for entry in snapshot.entries(query.include_ignored(), 0) {
 802                if entry.is_dir() && entry.is_ignored {
 803                    if !settings.is_path_excluded(&entry.path) {
 804                        Self::scan_ignored_dir(
 805                            &fs,
 806                            &snapshot,
 807                            &entry.path,
 808                            &query,
 809                            include_root,
 810                            &filter_tx,
 811                            &output_tx,
 812                        )
 813                        .await?;
 814                    }
 815                    continue;
 816                }
 817
 818                if entry.is_fifo || !entry.is_file() {
 819                    continue;
 820                }
 821
 822                if query.filters_path() {
 823                    let matched_path = if include_root {
 824                        let mut full_path = PathBuf::from(snapshot.root_name());
 825                        full_path.push(&entry.path);
 826                        query.file_matches(&full_path)
 827                    } else {
 828                        query.file_matches(&entry.path)
 829                    };
 830                    if !matched_path {
 831                        continue;
 832                    }
 833                }
 834
 835                let (mut tx, rx) = oneshot::channel();
 836
 837                if open_entries.contains(&entry.id) {
 838                    tx.send(ProjectPath {
 839                        worktree_id: snapshot.id(),
 840                        path: entry.path.clone(),
 841                    })
 842                    .await?;
 843                } else {
 844                    filter_tx
 845                        .send(MatchingEntry {
 846                            respond: tx,
 847                            worktree_path: snapshot.abs_path().clone(),
 848                            path: ProjectPath {
 849                                worktree_id: snapshot.id(),
 850                                path: entry.path.clone(),
 851                            },
 852                        })
 853                        .await?;
 854                }
 855
 856                output_tx.send(rx).await?;
 857            }
 858        }
 859        Ok(())
 860    }
 861
 862    pub fn branches(
 863        &self,
 864        project_path: ProjectPath,
 865        cx: &AppContext,
 866    ) -> Task<Result<Vec<git::repository::Branch>>> {
 867        let Some(worktree) = self.worktree_for_id(project_path.worktree_id, cx) else {
 868            return Task::ready(Err(anyhow!("No worktree found for ProjectPath")));
 869        };
 870
 871        match worktree.read(cx) {
 872            Worktree::Local(local_worktree) => {
 873                let branches = util::maybe!({
 874                    let worktree_error = |error| {
 875                        format!(
 876                            "{} for worktree {}",
 877                            error,
 878                            local_worktree.abs_path().to_string_lossy()
 879                        )
 880                    };
 881
 882                    let entry = local_worktree
 883                        .git_entry(project_path.path)
 884                        .with_context(|| worktree_error("No git entry found"))?;
 885
 886                    let repo = local_worktree
 887                        .get_local_repo(&entry)
 888                        .with_context(|| worktree_error("No repository found"))?
 889                        .repo()
 890                        .clone();
 891
 892                    repo.branches()
 893                });
 894
 895                Task::ready(branches)
 896            }
 897            Worktree::Remote(remote_worktree) => {
 898                let request = remote_worktree.client().request(proto::GitBranches {
 899                    project_id: remote_worktree.project_id(),
 900                    repository: Some(proto::ProjectPath {
 901                        worktree_id: project_path.worktree_id.to_proto(),
 902                        path: project_path.path.to_string_lossy().to_string(), // Root path
 903                    }),
 904                });
 905
 906                cx.background_executor().spawn(async move {
 907                    let response = request.await?;
 908
 909                    let branches = response
 910                        .branches
 911                        .into_iter()
 912                        .map(|proto_branch| git::repository::Branch {
 913                            is_head: proto_branch.is_head,
 914                            name: proto_branch.name.into(),
 915                            unix_timestamp: proto_branch
 916                                .unix_timestamp
 917                                .map(|timestamp| timestamp as i64),
 918                        })
 919                        .collect();
 920
 921                    Ok(branches)
 922                })
 923            }
 924        }
 925    }
 926
 927    pub fn update_or_create_branch(
 928        &self,
 929        repository: ProjectPath,
 930        new_branch: String,
 931        cx: &AppContext,
 932    ) -> Task<Result<()>> {
 933        let Some(worktree) = self.worktree_for_id(repository.worktree_id, cx) else {
 934            return Task::ready(Err(anyhow!("No worktree found for ProjectPath")));
 935        };
 936
 937        match worktree.read(cx) {
 938            Worktree::Local(local_worktree) => {
 939                let result = util::maybe!({
 940                    let worktree_error = |error| {
 941                        format!(
 942                            "{} for worktree {}",
 943                            error,
 944                            local_worktree.abs_path().to_string_lossy()
 945                        )
 946                    };
 947
 948                    let entry = local_worktree
 949                        .git_entry(repository.path)
 950                        .with_context(|| worktree_error("No git entry found"))?;
 951
 952                    let repo = local_worktree
 953                        .get_local_repo(&entry)
 954                        .with_context(|| worktree_error("No repository found"))?
 955                        .repo()
 956                        .clone();
 957
 958                    if !repo.branch_exits(&new_branch)? {
 959                        repo.create_branch(&new_branch)?;
 960                    }
 961
 962                    repo.change_branch(&new_branch)?;
 963
 964                    Ok(())
 965                });
 966
 967                Task::ready(result)
 968            }
 969            Worktree::Remote(remote_worktree) => {
 970                let request = remote_worktree.client().request(proto::UpdateGitBranch {
 971                    project_id: remote_worktree.project_id(),
 972                    repository: Some(proto::ProjectPath {
 973                        worktree_id: repository.worktree_id.to_proto(),
 974                        path: repository.path.to_string_lossy().to_string(), // Root path
 975                    }),
 976                    branch_name: new_branch,
 977                });
 978
 979                cx.background_executor().spawn(async move {
 980                    request.await?;
 981                    Ok(())
 982                })
 983            }
 984        }
 985    }
 986
 987    async fn filter_paths(
 988        fs: &Arc<dyn Fs>,
 989        mut input: Receiver<MatchingEntry>,
 990        query: &SearchQuery,
 991    ) -> Result<()> {
 992        while let Some(mut entry) = input.next().await {
 993            let abs_path = entry.worktree_path.join(&entry.path.path);
 994            let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
 995                continue;
 996            };
 997            if query.detect(file).unwrap_or(false) {
 998                entry.respond.send(entry.path).await?
 999            }
1000        }
1001
1002        Ok(())
1003    }
1004
1005    pub async fn handle_create_project_entry(
1006        this: Model<Self>,
1007        envelope: TypedEnvelope<proto::CreateProjectEntry>,
1008        mut cx: AsyncAppContext,
1009    ) -> Result<proto::ProjectEntryResponse> {
1010        let worktree = this.update(&mut cx, |this, cx| {
1011            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
1012            this.worktree_for_id(worktree_id, cx)
1013                .ok_or_else(|| anyhow!("worktree not found"))
1014        })??;
1015        Worktree::handle_create_entry(worktree, envelope.payload, cx).await
1016    }
1017
1018    pub async fn handle_rename_project_entry(
1019        this: Model<super::Project>,
1020        envelope: TypedEnvelope<proto::RenameProjectEntry>,
1021        mut cx: AsyncAppContext,
1022    ) -> Result<proto::ProjectEntryResponse> {
1023        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1024        let (worktree_id, worktree, old_path, is_dir) = this
1025            .update(&mut cx, |this, cx| {
1026                this.worktree_store
1027                    .read(cx)
1028                    .worktree_and_entry_for_id(entry_id, cx)
1029                    .map(|(worktree, entry)| {
1030                        (
1031                            worktree.read(cx).id(),
1032                            worktree,
1033                            entry.path.clone(),
1034                            entry.is_dir(),
1035                        )
1036                    })
1037            })?
1038            .ok_or_else(|| anyhow!("worktree not found"))?;
1039        let (old_abs_path, new_abs_path) = {
1040            let root_path = worktree.update(&mut cx, |this, _| this.abs_path())?;
1041            (
1042                root_path.join(&old_path),
1043                root_path.join(&envelope.payload.new_path),
1044            )
1045        };
1046        let lsp_store = this
1047            .update(&mut cx, |this, _| this.lsp_store())?
1048            .downgrade();
1049        LspStore::will_rename_entry(
1050            lsp_store,
1051            worktree_id,
1052            &old_abs_path,
1053            &new_abs_path,
1054            is_dir,
1055            cx.clone(),
1056        )
1057        .await;
1058        let response = Worktree::handle_rename_entry(worktree, envelope.payload, cx.clone()).await;
1059        this.update(&mut cx, |this, cx| {
1060            this.lsp_store().read(cx).did_rename_entry(
1061                worktree_id,
1062                &old_abs_path,
1063                &new_abs_path,
1064                is_dir,
1065            );
1066        })
1067        .ok();
1068        response
1069    }
1070
1071    pub async fn handle_copy_project_entry(
1072        this: Model<Self>,
1073        envelope: TypedEnvelope<proto::CopyProjectEntry>,
1074        mut cx: AsyncAppContext,
1075    ) -> Result<proto::ProjectEntryResponse> {
1076        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1077        let worktree = this.update(&mut cx, |this, cx| {
1078            this.worktree_for_entry(entry_id, cx)
1079                .ok_or_else(|| anyhow!("worktree not found"))
1080        })??;
1081        Worktree::handle_copy_entry(worktree, envelope.payload, cx).await
1082    }
1083
1084    pub async fn handle_delete_project_entry(
1085        this: Model<Self>,
1086        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
1087        mut cx: AsyncAppContext,
1088    ) -> Result<proto::ProjectEntryResponse> {
1089        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1090        let worktree = this.update(&mut cx, |this, cx| {
1091            this.worktree_for_entry(entry_id, cx)
1092                .ok_or_else(|| anyhow!("worktree not found"))
1093        })??;
1094        Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
1095    }
1096
1097    pub async fn handle_expand_project_entry(
1098        this: Model<Self>,
1099        envelope: TypedEnvelope<proto::ExpandProjectEntry>,
1100        mut cx: AsyncAppContext,
1101    ) -> Result<proto::ExpandProjectEntryResponse> {
1102        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1103        let worktree = this
1104            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1105            .ok_or_else(|| anyhow!("invalid request"))?;
1106        Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
1107    }
1108
1109    pub async fn handle_git_branches(
1110        this: Model<Self>,
1111        branches: TypedEnvelope<proto::GitBranches>,
1112        cx: AsyncAppContext,
1113    ) -> Result<proto::GitBranchesResponse> {
1114        let project_path = branches
1115            .payload
1116            .repository
1117            .clone()
1118            .context("Invalid GitBranches call")?;
1119        let project_path = ProjectPath {
1120            worktree_id: WorktreeId::from_proto(project_path.worktree_id),
1121            path: Path::new(&project_path.path).into(),
1122        };
1123
1124        let branches = this
1125            .read_with(&cx, |this, cx| this.branches(project_path, cx))?
1126            .await?;
1127
1128        Ok(proto::GitBranchesResponse {
1129            branches: branches
1130                .into_iter()
1131                .map(|branch| proto::Branch {
1132                    is_head: branch.is_head,
1133                    name: branch.name.to_string(),
1134                    unix_timestamp: branch.unix_timestamp.map(|timestamp| timestamp as u64),
1135                })
1136                .collect(),
1137        })
1138    }
1139
1140    pub async fn handle_update_branch(
1141        this: Model<Self>,
1142        update_branch: TypedEnvelope<proto::UpdateGitBranch>,
1143        cx: AsyncAppContext,
1144    ) -> Result<proto::Ack> {
1145        let project_path = update_branch
1146            .payload
1147            .repository
1148            .clone()
1149            .context("Invalid GitBranches call")?;
1150        let project_path = ProjectPath {
1151            worktree_id: WorktreeId::from_proto(project_path.worktree_id),
1152            path: Path::new(&project_path.path).into(),
1153        };
1154        let new_branch = update_branch.payload.branch_name;
1155
1156        this.read_with(&cx, |this, cx| {
1157            this.update_or_create_branch(project_path, new_branch, cx)
1158        })?
1159        .await?;
1160
1161        Ok(proto::Ack {})
1162    }
1163}
1164
1165#[derive(Clone, Debug)]
1166enum WorktreeHandle {
1167    Strong(Model<Worktree>),
1168    Weak(WeakModel<Worktree>),
1169}
1170
1171impl WorktreeHandle {
1172    fn upgrade(&self) -> Option<Model<Worktree>> {
1173        match self {
1174            WorktreeHandle::Strong(handle) => Some(handle.clone()),
1175            WorktreeHandle::Weak(handle) => handle.upgrade(),
1176        }
1177    }
1178}