worktree_store.rs

   1use std::{
   2    path::{Path, PathBuf},
   3    sync::{atomic::AtomicUsize, Arc},
   4};
   5
   6use anyhow::{anyhow, Context as _, Result};
   7use collections::{HashMap, HashSet};
   8use fs::Fs;
   9use futures::{
  10    future::{BoxFuture, Shared},
  11    FutureExt, SinkExt,
  12};
  13use gpui::{
  14    AppContext, AsyncAppContext, EntityId, EventEmitter, Model, ModelContext, Task, WeakModel,
  15};
  16use postage::oneshot;
  17use rpc::{
  18    proto::{self, SSH_PROJECT_ID},
  19    AnyProtoClient, ErrorExt, TypedEnvelope,
  20};
  21use smol::{
  22    channel::{Receiver, Sender},
  23    stream::StreamExt,
  24};
  25use text::ReplicaId;
  26use util::{paths::SanitizedPath, ResultExt};
  27use worktree::{Entry, ProjectEntryId, Worktree, WorktreeId, WorktreeSettings};
  28
  29use crate::{search::SearchQuery, LspStore, ProjectPath};
  30
  31struct MatchingEntry {
  32    worktree_path: Arc<Path>,
  33    path: ProjectPath,
  34    respond: oneshot::Sender<ProjectPath>,
  35}
  36
  37enum WorktreeStoreState {
  38    Local {
  39        fs: Arc<dyn Fs>,
  40    },
  41    Remote {
  42        upstream_client: AnyProtoClient,
  43        upstream_project_id: u64,
  44    },
  45}
  46
  47pub struct WorktreeStore {
  48    next_entry_id: Arc<AtomicUsize>,
  49    downstream_client: Option<(AnyProtoClient, u64)>,
  50    retain_worktrees: bool,
  51    worktrees: Vec<WorktreeHandle>,
  52    worktrees_reordered: bool,
  53    #[allow(clippy::type_complexity)]
  54    loading_worktrees:
  55        HashMap<SanitizedPath, Shared<Task<Result<Model<Worktree>, Arc<anyhow::Error>>>>>,
  56    state: WorktreeStoreState,
  57}
  58
  59pub enum WorktreeStoreEvent {
  60    WorktreeAdded(Model<Worktree>),
  61    WorktreeRemoved(EntityId, WorktreeId),
  62    WorktreeReleased(EntityId, WorktreeId),
  63    WorktreeOrderChanged,
  64    WorktreeUpdateSent(Model<Worktree>),
  65}
  66
  67impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
  68
  69impl WorktreeStore {
  70    pub fn init(client: &AnyProtoClient) {
  71        client.add_model_request_handler(Self::handle_create_project_entry);
  72        client.add_model_request_handler(Self::handle_copy_project_entry);
  73        client.add_model_request_handler(Self::handle_delete_project_entry);
  74        client.add_model_request_handler(Self::handle_expand_project_entry);
  75        client.add_model_request_handler(Self::handle_git_branches);
  76        client.add_model_request_handler(Self::handle_update_branch);
  77    }
  78
  79    pub fn local(retain_worktrees: bool, fs: Arc<dyn Fs>) -> Self {
  80        Self {
  81            next_entry_id: Default::default(),
  82            loading_worktrees: Default::default(),
  83            downstream_client: None,
  84            worktrees: Vec::new(),
  85            worktrees_reordered: false,
  86            retain_worktrees,
  87            state: WorktreeStoreState::Local { fs },
  88        }
  89    }
  90
  91    pub fn remote(
  92        retain_worktrees: bool,
  93        upstream_client: AnyProtoClient,
  94        upstream_project_id: u64,
  95    ) -> Self {
  96        Self {
  97            next_entry_id: Default::default(),
  98            loading_worktrees: Default::default(),
  99            downstream_client: None,
 100            worktrees: Vec::new(),
 101            worktrees_reordered: false,
 102            retain_worktrees,
 103            state: WorktreeStoreState::Remote {
 104                upstream_client,
 105                upstream_project_id,
 106            },
 107        }
 108    }
 109
 110    /// Iterates through all worktrees, including ones that don't appear in the project panel
 111    pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Model<Worktree>> {
 112        self.worktrees
 113            .iter()
 114            .filter_map(move |worktree| worktree.upgrade())
 115    }
 116
 117    /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
 118    pub fn visible_worktrees<'a>(
 119        &'a self,
 120        cx: &'a AppContext,
 121    ) -> impl 'a + DoubleEndedIterator<Item = Model<Worktree>> {
 122        self.worktrees()
 123            .filter(|worktree| worktree.read(cx).is_visible())
 124    }
 125
 126    pub fn worktree_for_id(&self, id: WorktreeId, cx: &AppContext) -> Option<Model<Worktree>> {
 127        self.worktrees()
 128            .find(|worktree| worktree.read(cx).id() == id)
 129    }
 130
 131    pub fn current_branch(&self, repository: ProjectPath, cx: &AppContext) -> Option<Arc<str>> {
 132        self.worktree_for_id(repository.worktree_id, cx)?
 133            .read(cx)
 134            .git_entry(repository.path)?
 135            .branch()
 136    }
 137
 138    pub fn worktree_for_entry(
 139        &self,
 140        entry_id: ProjectEntryId,
 141        cx: &AppContext,
 142    ) -> Option<Model<Worktree>> {
 143        self.worktrees()
 144            .find(|worktree| worktree.read(cx).contains_entry(entry_id))
 145    }
 146
 147    pub fn find_worktree(
 148        &self,
 149        abs_path: impl Into<SanitizedPath>,
 150        cx: &AppContext,
 151    ) -> Option<(Model<Worktree>, PathBuf)> {
 152        let abs_path: SanitizedPath = abs_path.into();
 153        for tree in self.worktrees() {
 154            if let Ok(relative_path) = abs_path.as_path().strip_prefix(tree.read(cx).abs_path()) {
 155                return Some((tree.clone(), relative_path.into()));
 156            }
 157        }
 158        None
 159    }
 160
 161    pub fn find_or_create_worktree(
 162        &mut self,
 163        abs_path: impl AsRef<Path>,
 164        visible: bool,
 165        cx: &mut ModelContext<Self>,
 166    ) -> Task<Result<(Model<Worktree>, PathBuf)>> {
 167        let abs_path = abs_path.as_ref();
 168        if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
 169            Task::ready(Ok((tree, relative_path)))
 170        } else {
 171            let worktree = self.create_worktree(abs_path, visible, cx);
 172            cx.background_executor()
 173                .spawn(async move { Ok((worktree.await?, PathBuf::new())) })
 174        }
 175    }
 176
 177    pub fn entry_for_id<'a>(
 178        &'a self,
 179        entry_id: ProjectEntryId,
 180        cx: &'a AppContext,
 181    ) -> Option<&'a Entry> {
 182        self.worktrees()
 183            .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
 184    }
 185
 186    pub fn worktree_and_entry_for_id<'a>(
 187        &'a self,
 188        entry_id: ProjectEntryId,
 189        cx: &'a AppContext,
 190    ) -> Option<(Model<Worktree>, &'a Entry)> {
 191        self.worktrees().find_map(|worktree| {
 192            worktree
 193                .read(cx)
 194                .entry_for_id(entry_id)
 195                .map(|e| (worktree.clone(), e))
 196        })
 197    }
 198
 199    pub fn entry_for_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Entry> {
 200        self.worktree_for_id(path.worktree_id, cx)?
 201            .read(cx)
 202            .entry_for_path(&path.path)
 203            .cloned()
 204    }
 205
 206    pub fn create_worktree(
 207        &mut self,
 208        abs_path: impl Into<SanitizedPath>,
 209        visible: bool,
 210        cx: &mut ModelContext<Self>,
 211    ) -> Task<Result<Model<Worktree>>> {
 212        let abs_path: SanitizedPath = abs_path.into();
 213        if !self.loading_worktrees.contains_key(&abs_path) {
 214            let task = match &self.state {
 215                WorktreeStoreState::Remote {
 216                    upstream_client, ..
 217                } => {
 218                    if upstream_client.is_via_collab() {
 219                        Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
 220                    } else {
 221                        self.create_ssh_worktree(
 222                            upstream_client.clone(),
 223                            abs_path.clone(),
 224                            visible,
 225                            cx,
 226                        )
 227                    }
 228                }
 229                WorktreeStoreState::Local { fs } => {
 230                    self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
 231                }
 232            };
 233
 234            self.loading_worktrees
 235                .insert(abs_path.clone(), task.shared());
 236        }
 237        let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
 238        cx.spawn(|this, mut cx| async move {
 239            let result = task.await;
 240            this.update(&mut cx, |this, _| this.loading_worktrees.remove(&abs_path))
 241                .ok();
 242            match result {
 243                Ok(worktree) => Ok(worktree),
 244                Err(err) => Err((*err).cloned()),
 245            }
 246        })
 247    }
 248
 249    fn create_ssh_worktree(
 250        &mut self,
 251        client: AnyProtoClient,
 252        abs_path: impl Into<SanitizedPath>,
 253        visible: bool,
 254        cx: &mut ModelContext<Self>,
 255    ) -> Task<Result<Model<Worktree>, Arc<anyhow::Error>>> {
 256        let mut abs_path = Into::<SanitizedPath>::into(abs_path).to_string();
 257        // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
 258        // in which case want to strip the leading the `/`.
 259        // On the host-side, the `~` will get expanded.
 260        // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
 261        if abs_path.starts_with("/~") {
 262            abs_path = abs_path[1..].to_string();
 263        }
 264        if abs_path.is_empty() || abs_path == "/" {
 265            abs_path = "~/".to_string();
 266        }
 267        cx.spawn(|this, mut cx| async move {
 268            let this = this.upgrade().context("Dropped worktree store")?;
 269
 270            let response = client
 271                .request(proto::AddWorktree {
 272                    project_id: SSH_PROJECT_ID,
 273                    path: abs_path.clone(),
 274                    visible,
 275                })
 276                .await?;
 277
 278            if let Some(existing_worktree) = this.read_with(&cx, |this, cx| {
 279                this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
 280            })? {
 281                return Ok(existing_worktree);
 282            }
 283
 284            let root_name = PathBuf::from(&response.canonicalized_path)
 285                .file_name()
 286                .map(|n| n.to_string_lossy().to_string())
 287                .unwrap_or(response.canonicalized_path.to_string());
 288
 289            let worktree = cx.update(|cx| {
 290                Worktree::remote(
 291                    SSH_PROJECT_ID,
 292                    0,
 293                    proto::WorktreeMetadata {
 294                        id: response.worktree_id,
 295                        root_name,
 296                        visible,
 297                        abs_path: response.canonicalized_path,
 298                    },
 299                    client,
 300                    cx,
 301                )
 302            })?;
 303
 304            this.update(&mut cx, |this, cx| {
 305                this.add(&worktree, cx);
 306            })?;
 307            Ok(worktree)
 308        })
 309    }
 310
 311    fn create_local_worktree(
 312        &mut self,
 313        fs: Arc<dyn Fs>,
 314        abs_path: impl Into<SanitizedPath>,
 315        visible: bool,
 316        cx: &mut ModelContext<Self>,
 317    ) -> Task<Result<Model<Worktree>, Arc<anyhow::Error>>> {
 318        let next_entry_id = self.next_entry_id.clone();
 319        let path: SanitizedPath = abs_path.into();
 320
 321        cx.spawn(move |this, mut cx| async move {
 322            let worktree = Worktree::local(path.clone(), visible, fs, next_entry_id, &mut cx).await;
 323
 324            let worktree = worktree?;
 325
 326            this.update(&mut cx, |this, cx| this.add(&worktree, cx))?;
 327
 328            if visible {
 329                cx.update(|cx| {
 330                    cx.add_recent_document(path.as_path());
 331                })
 332                .log_err();
 333            }
 334
 335            Ok(worktree)
 336        })
 337    }
 338
 339    pub fn add(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
 340        let worktree_id = worktree.read(cx).id();
 341        debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
 342
 343        let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
 344        let handle = if push_strong_handle {
 345            WorktreeHandle::Strong(worktree.clone())
 346        } else {
 347            WorktreeHandle::Weak(worktree.downgrade())
 348        };
 349        if self.worktrees_reordered {
 350            self.worktrees.push(handle);
 351        } else {
 352            let i = match self
 353                .worktrees
 354                .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
 355                    other.upgrade().map(|worktree| worktree.read(cx).abs_path())
 356                }) {
 357                Ok(i) | Err(i) => i,
 358            };
 359            self.worktrees.insert(i, handle);
 360        }
 361
 362        cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
 363        self.send_project_updates(cx);
 364
 365        let handle_id = worktree.entity_id();
 366        cx.observe_release(worktree, move |this, worktree, cx| {
 367            cx.emit(WorktreeStoreEvent::WorktreeReleased(
 368                handle_id,
 369                worktree.id(),
 370            ));
 371            cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 372                handle_id,
 373                worktree.id(),
 374            ));
 375            this.send_project_updates(cx);
 376        })
 377        .detach();
 378    }
 379
 380    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut ModelContext<Self>) {
 381        self.worktrees.retain(|worktree| {
 382            if let Some(worktree) = worktree.upgrade() {
 383                if worktree.read(cx).id() == id_to_remove {
 384                    cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 385                        worktree.entity_id(),
 386                        id_to_remove,
 387                    ));
 388                    false
 389                } else {
 390                    true
 391                }
 392            } else {
 393                false
 394            }
 395        });
 396        self.send_project_updates(cx);
 397    }
 398
 399    pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
 400        self.worktrees_reordered = worktrees_reordered;
 401    }
 402
 403    fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
 404        match &self.state {
 405            WorktreeStoreState::Remote {
 406                upstream_client,
 407                upstream_project_id,
 408                ..
 409            } => Some((upstream_client.clone(), *upstream_project_id)),
 410            WorktreeStoreState::Local { .. } => None,
 411        }
 412    }
 413
 414    pub fn set_worktrees_from_proto(
 415        &mut self,
 416        worktrees: Vec<proto::WorktreeMetadata>,
 417        replica_id: ReplicaId,
 418        cx: &mut ModelContext<Self>,
 419    ) -> Result<()> {
 420        let mut old_worktrees_by_id = self
 421            .worktrees
 422            .drain(..)
 423            .filter_map(|worktree| {
 424                let worktree = worktree.upgrade()?;
 425                Some((worktree.read(cx).id(), worktree))
 426            })
 427            .collect::<HashMap<_, _>>();
 428
 429        let (client, project_id) = self
 430            .upstream_client()
 431            .clone()
 432            .ok_or_else(|| anyhow!("invalid project"))?;
 433
 434        for worktree in worktrees {
 435            if let Some(old_worktree) =
 436                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
 437            {
 438                let push_strong_handle =
 439                    self.retain_worktrees || old_worktree.read(cx).is_visible();
 440                let handle = if push_strong_handle {
 441                    WorktreeHandle::Strong(old_worktree.clone())
 442                } else {
 443                    WorktreeHandle::Weak(old_worktree.downgrade())
 444                };
 445                self.worktrees.push(handle);
 446            } else {
 447                self.add(
 448                    &Worktree::remote(project_id, replica_id, worktree, client.clone(), cx),
 449                    cx,
 450                );
 451            }
 452        }
 453        self.send_project_updates(cx);
 454
 455        Ok(())
 456    }
 457
 458    pub fn move_worktree(
 459        &mut self,
 460        source: WorktreeId,
 461        destination: WorktreeId,
 462        cx: &mut ModelContext<Self>,
 463    ) -> Result<()> {
 464        if source == destination {
 465            return Ok(());
 466        }
 467
 468        let mut source_index = None;
 469        let mut destination_index = None;
 470        for (i, worktree) in self.worktrees.iter().enumerate() {
 471            if let Some(worktree) = worktree.upgrade() {
 472                let worktree_id = worktree.read(cx).id();
 473                if worktree_id == source {
 474                    source_index = Some(i);
 475                    if destination_index.is_some() {
 476                        break;
 477                    }
 478                } else if worktree_id == destination {
 479                    destination_index = Some(i);
 480                    if source_index.is_some() {
 481                        break;
 482                    }
 483                }
 484            }
 485        }
 486
 487        let source_index =
 488            source_index.with_context(|| format!("Missing worktree for id {source}"))?;
 489        let destination_index =
 490            destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
 491
 492        if source_index == destination_index {
 493            return Ok(());
 494        }
 495
 496        let worktree_to_move = self.worktrees.remove(source_index);
 497        self.worktrees.insert(destination_index, worktree_to_move);
 498        self.worktrees_reordered = true;
 499        cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
 500        cx.notify();
 501        Ok(())
 502    }
 503
 504    pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
 505        for worktree in &self.worktrees {
 506            if let Some(worktree) = worktree.upgrade() {
 507                worktree.update(cx, |worktree, _| {
 508                    if let Some(worktree) = worktree.as_remote_mut() {
 509                        worktree.disconnected_from_host();
 510                    }
 511                });
 512            }
 513        }
 514    }
 515
 516    pub fn send_project_updates(&mut self, cx: &mut ModelContext<Self>) {
 517        let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
 518            return;
 519        };
 520
 521        let update = proto::UpdateProject {
 522            project_id,
 523            worktrees: self.worktree_metadata_protos(cx),
 524        };
 525
 526        // collab has bad concurrency guarantees, so we send requests in serial.
 527        let update_project = if downstream_client.is_via_collab() {
 528            Some(downstream_client.request(update))
 529        } else {
 530            downstream_client.send(update).log_err();
 531            None
 532        };
 533        cx.spawn(|this, mut cx| async move {
 534            if let Some(update_project) = update_project {
 535                update_project.await?;
 536            }
 537
 538            this.update(&mut cx, |this, cx| {
 539                let worktrees = this.worktrees().collect::<Vec<_>>();
 540
 541                for worktree in worktrees {
 542                    worktree.update(cx, |worktree, cx| {
 543                        let client = downstream_client.clone();
 544                        worktree.observe_updates(project_id, cx, {
 545                            move |update| {
 546                                let client = client.clone();
 547                                async move {
 548                                    if client.is_via_collab() {
 549                                        client
 550                                            .request(update)
 551                                            .map(|result| result.log_err().is_some())
 552                                            .await
 553                                    } else {
 554                                        client.send(update).log_err().is_some()
 555                                    }
 556                                }
 557                            }
 558                        });
 559                    });
 560
 561                    cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
 562                }
 563
 564                anyhow::Ok(())
 565            })
 566        })
 567        .detach_and_log_err(cx);
 568    }
 569
 570    pub fn worktree_metadata_protos(&self, cx: &AppContext) -> Vec<proto::WorktreeMetadata> {
 571        self.worktrees()
 572            .map(|worktree| {
 573                let worktree = worktree.read(cx);
 574                proto::WorktreeMetadata {
 575                    id: worktree.id().to_proto(),
 576                    root_name: worktree.root_name().into(),
 577                    visible: worktree.is_visible(),
 578                    abs_path: worktree.abs_path().to_string_lossy().into(),
 579                }
 580            })
 581            .collect()
 582    }
 583
 584    pub fn shared(
 585        &mut self,
 586        remote_id: u64,
 587        downstream_client: AnyProtoClient,
 588        cx: &mut ModelContext<Self>,
 589    ) {
 590        self.retain_worktrees = true;
 591        self.downstream_client = Some((downstream_client, remote_id));
 592
 593        // When shared, retain all worktrees
 594        for worktree_handle in self.worktrees.iter_mut() {
 595            match worktree_handle {
 596                WorktreeHandle::Strong(_) => {}
 597                WorktreeHandle::Weak(worktree) => {
 598                    if let Some(worktree) = worktree.upgrade() {
 599                        *worktree_handle = WorktreeHandle::Strong(worktree);
 600                    }
 601                }
 602            }
 603        }
 604        self.send_project_updates(cx);
 605    }
 606
 607    pub fn unshared(&mut self, cx: &mut ModelContext<Self>) {
 608        self.retain_worktrees = false;
 609        self.downstream_client.take();
 610
 611        // When not shared, only retain the visible worktrees
 612        for worktree_handle in self.worktrees.iter_mut() {
 613            if let WorktreeHandle::Strong(worktree) = worktree_handle {
 614                let is_visible = worktree.update(cx, |worktree, _| {
 615                    worktree.stop_observing_updates();
 616                    worktree.is_visible()
 617                });
 618                if !is_visible {
 619                    *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
 620                }
 621            }
 622        }
 623    }
 624
 625    /// search over all worktrees and return buffers that *might* match the search.
 626    pub fn find_search_candidates(
 627        &self,
 628        query: SearchQuery,
 629        limit: usize,
 630        open_entries: HashSet<ProjectEntryId>,
 631        fs: Arc<dyn Fs>,
 632        cx: &ModelContext<Self>,
 633    ) -> Receiver<ProjectPath> {
 634        let snapshots = self
 635            .visible_worktrees(cx)
 636            .filter_map(|tree| {
 637                let tree = tree.read(cx);
 638                Some((tree.snapshot(), tree.as_local()?.settings()))
 639            })
 640            .collect::<Vec<_>>();
 641
 642        let executor = cx.background_executor().clone();
 643
 644        // We want to return entries in the order they are in the worktrees, so we have one
 645        // thread that iterates over the worktrees (and ignored directories) as necessary,
 646        // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
 647        // channel.
 648        // We spawn a number of workers that take items from the filter channel and check the query
 649        // against the version of the file on disk.
 650        let (filter_tx, filter_rx) = smol::channel::bounded(64);
 651        let (output_tx, mut output_rx) = smol::channel::bounded(64);
 652        let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
 653
 654        let input = cx.background_executor().spawn({
 655            let fs = fs.clone();
 656            let query = query.clone();
 657            async move {
 658                Self::find_candidate_paths(
 659                    fs,
 660                    snapshots,
 661                    open_entries,
 662                    query,
 663                    filter_tx,
 664                    output_tx,
 665                )
 666                .await
 667                .log_err();
 668            }
 669        });
 670        const MAX_CONCURRENT_FILE_SCANS: usize = 64;
 671        let filters = cx.background_executor().spawn(async move {
 672            let fs = &fs;
 673            let query = &query;
 674            executor
 675                .scoped(move |scope| {
 676                    for _ in 0..MAX_CONCURRENT_FILE_SCANS {
 677                        let filter_rx = filter_rx.clone();
 678                        scope.spawn(async move {
 679                            Self::filter_paths(fs, filter_rx, query).await.log_err();
 680                        })
 681                    }
 682                })
 683                .await;
 684        });
 685        cx.background_executor()
 686            .spawn(async move {
 687                let mut matched = 0;
 688                while let Some(mut receiver) = output_rx.next().await {
 689                    let Some(path) = receiver.next().await else {
 690                        continue;
 691                    };
 692                    let Ok(_) = matching_paths_tx.send(path).await else {
 693                        break;
 694                    };
 695                    matched += 1;
 696                    if matched == limit {
 697                        break;
 698                    }
 699                }
 700                drop(input);
 701                drop(filters);
 702            })
 703            .detach();
 704        matching_paths_rx
 705    }
 706
 707    fn scan_ignored_dir<'a>(
 708        fs: &'a Arc<dyn Fs>,
 709        snapshot: &'a worktree::Snapshot,
 710        path: &'a Path,
 711        query: &'a SearchQuery,
 712        include_root: bool,
 713        filter_tx: &'a Sender<MatchingEntry>,
 714        output_tx: &'a Sender<oneshot::Receiver<ProjectPath>>,
 715    ) -> BoxFuture<'a, Result<()>> {
 716        async move {
 717            let abs_path = snapshot.abs_path().join(path);
 718            let Some(mut files) = fs
 719                .read_dir(&abs_path)
 720                .await
 721                .with_context(|| format!("listing ignored path {abs_path:?}"))
 722                .log_err()
 723            else {
 724                return Ok(());
 725            };
 726
 727            let mut results = Vec::new();
 728
 729            while let Some(Ok(file)) = files.next().await {
 730                let Some(metadata) = fs
 731                    .metadata(&file)
 732                    .await
 733                    .with_context(|| format!("fetching fs metadata for {abs_path:?}"))
 734                    .log_err()
 735                    .flatten()
 736                else {
 737                    continue;
 738                };
 739                if metadata.is_symlink || metadata.is_fifo {
 740                    continue;
 741                }
 742                results.push((
 743                    file.strip_prefix(snapshot.abs_path())?.to_path_buf(),
 744                    !metadata.is_dir,
 745                ))
 746            }
 747            results.sort_by(|(a_path, _), (b_path, _)| a_path.cmp(b_path));
 748            for (path, is_file) in results {
 749                if is_file {
 750                    if query.filters_path() {
 751                        let matched_path = if include_root {
 752                            let mut full_path = PathBuf::from(snapshot.root_name());
 753                            full_path.push(&path);
 754                            query.file_matches(&full_path)
 755                        } else {
 756                            query.file_matches(&path)
 757                        };
 758                        if !matched_path {
 759                            continue;
 760                        }
 761                    }
 762                    let (tx, rx) = oneshot::channel();
 763                    output_tx.send(rx).await?;
 764                    filter_tx
 765                        .send(MatchingEntry {
 766                            respond: tx,
 767                            worktree_path: snapshot.abs_path().clone(),
 768                            path: ProjectPath {
 769                                worktree_id: snapshot.id(),
 770                                path: Arc::from(path),
 771                            },
 772                        })
 773                        .await?;
 774                } else {
 775                    Self::scan_ignored_dir(
 776                        fs,
 777                        snapshot,
 778                        &path,
 779                        query,
 780                        include_root,
 781                        filter_tx,
 782                        output_tx,
 783                    )
 784                    .await?;
 785                }
 786            }
 787            Ok(())
 788        }
 789        .boxed()
 790    }
 791
 792    async fn find_candidate_paths(
 793        fs: Arc<dyn Fs>,
 794        snapshots: Vec<(worktree::Snapshot, WorktreeSettings)>,
 795        open_entries: HashSet<ProjectEntryId>,
 796        query: SearchQuery,
 797        filter_tx: Sender<MatchingEntry>,
 798        output_tx: Sender<oneshot::Receiver<ProjectPath>>,
 799    ) -> Result<()> {
 800        let include_root = snapshots.len() > 1;
 801        for (snapshot, settings) in snapshots {
 802            for entry in snapshot.entries(query.include_ignored(), 0) {
 803                if entry.is_dir() && entry.is_ignored {
 804                    if !settings.is_path_excluded(&entry.path) {
 805                        Self::scan_ignored_dir(
 806                            &fs,
 807                            &snapshot,
 808                            &entry.path,
 809                            &query,
 810                            include_root,
 811                            &filter_tx,
 812                            &output_tx,
 813                        )
 814                        .await?;
 815                    }
 816                    continue;
 817                }
 818
 819                if entry.is_fifo || !entry.is_file() {
 820                    continue;
 821                }
 822
 823                if query.filters_path() {
 824                    let matched_path = if include_root {
 825                        let mut full_path = PathBuf::from(snapshot.root_name());
 826                        full_path.push(&entry.path);
 827                        query.file_matches(&full_path)
 828                    } else {
 829                        query.file_matches(&entry.path)
 830                    };
 831                    if !matched_path {
 832                        continue;
 833                    }
 834                }
 835
 836                let (mut tx, rx) = oneshot::channel();
 837
 838                if open_entries.contains(&entry.id) {
 839                    tx.send(ProjectPath {
 840                        worktree_id: snapshot.id(),
 841                        path: entry.path.clone(),
 842                    })
 843                    .await?;
 844                } else {
 845                    filter_tx
 846                        .send(MatchingEntry {
 847                            respond: tx,
 848                            worktree_path: snapshot.abs_path().clone(),
 849                            path: ProjectPath {
 850                                worktree_id: snapshot.id(),
 851                                path: entry.path.clone(),
 852                            },
 853                        })
 854                        .await?;
 855                }
 856
 857                output_tx.send(rx).await?;
 858            }
 859        }
 860        Ok(())
 861    }
 862
 863    pub fn branches(
 864        &self,
 865        project_path: ProjectPath,
 866        cx: &AppContext,
 867    ) -> Task<Result<Vec<git::repository::Branch>>> {
 868        let Some(worktree) = self.worktree_for_id(project_path.worktree_id, cx) else {
 869            return Task::ready(Err(anyhow!("No worktree found for ProjectPath")));
 870        };
 871
 872        match worktree.read(cx) {
 873            Worktree::Local(local_worktree) => {
 874                let branches = util::maybe!({
 875                    let worktree_error = |error| {
 876                        format!(
 877                            "{} for worktree {}",
 878                            error,
 879                            local_worktree.abs_path().to_string_lossy()
 880                        )
 881                    };
 882
 883                    let entry = local_worktree
 884                        .git_entry(project_path.path)
 885                        .with_context(|| worktree_error("No git entry found"))?;
 886
 887                    let repo = local_worktree
 888                        .get_local_repo(&entry)
 889                        .with_context(|| worktree_error("No repository found"))?
 890                        .repo()
 891                        .clone();
 892
 893                    repo.branches()
 894                });
 895
 896                Task::ready(branches)
 897            }
 898            Worktree::Remote(remote_worktree) => {
 899                let request = remote_worktree.client().request(proto::GitBranches {
 900                    project_id: remote_worktree.project_id(),
 901                    repository: Some(proto::ProjectPath {
 902                        worktree_id: project_path.worktree_id.to_proto(),
 903                        path: project_path.path.to_string_lossy().to_string(), // Root path
 904                    }),
 905                });
 906
 907                cx.background_executor().spawn(async move {
 908                    let response = request.await?;
 909
 910                    let branches = response
 911                        .branches
 912                        .into_iter()
 913                        .map(|proto_branch| git::repository::Branch {
 914                            is_head: proto_branch.is_head,
 915                            name: proto_branch.name.into(),
 916                            unix_timestamp: proto_branch
 917                                .unix_timestamp
 918                                .map(|timestamp| timestamp as i64),
 919                        })
 920                        .collect();
 921
 922                    Ok(branches)
 923                })
 924            }
 925        }
 926    }
 927
 928    pub fn update_or_create_branch(
 929        &self,
 930        repository: ProjectPath,
 931        new_branch: String,
 932        cx: &AppContext,
 933    ) -> Task<Result<()>> {
 934        let Some(worktree) = self.worktree_for_id(repository.worktree_id, cx) else {
 935            return Task::ready(Err(anyhow!("No worktree found for ProjectPath")));
 936        };
 937
 938        match worktree.read(cx) {
 939            Worktree::Local(local_worktree) => {
 940                let result = util::maybe!({
 941                    let worktree_error = |error| {
 942                        format!(
 943                            "{} for worktree {}",
 944                            error,
 945                            local_worktree.abs_path().to_string_lossy()
 946                        )
 947                    };
 948
 949                    let entry = local_worktree
 950                        .git_entry(repository.path)
 951                        .with_context(|| worktree_error("No git entry found"))?;
 952
 953                    let repo = local_worktree
 954                        .get_local_repo(&entry)
 955                        .with_context(|| worktree_error("No repository found"))?
 956                        .repo()
 957                        .clone();
 958
 959                    if !repo.branch_exits(&new_branch)? {
 960                        repo.create_branch(&new_branch)?;
 961                    }
 962
 963                    repo.change_branch(&new_branch)?;
 964
 965                    Ok(())
 966                });
 967
 968                Task::ready(result)
 969            }
 970            Worktree::Remote(remote_worktree) => {
 971                let request = remote_worktree.client().request(proto::UpdateGitBranch {
 972                    project_id: remote_worktree.project_id(),
 973                    repository: Some(proto::ProjectPath {
 974                        worktree_id: repository.worktree_id.to_proto(),
 975                        path: repository.path.to_string_lossy().to_string(), // Root path
 976                    }),
 977                    branch_name: new_branch,
 978                });
 979
 980                cx.background_executor().spawn(async move {
 981                    request.await?;
 982                    Ok(())
 983                })
 984            }
 985        }
 986    }
 987
 988    async fn filter_paths(
 989        fs: &Arc<dyn Fs>,
 990        mut input: Receiver<MatchingEntry>,
 991        query: &SearchQuery,
 992    ) -> Result<()> {
 993        while let Some(mut entry) = input.next().await {
 994            let abs_path = entry.worktree_path.join(&entry.path.path);
 995            let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
 996                continue;
 997            };
 998            if query.detect(file).unwrap_or(false) {
 999                entry.respond.send(entry.path).await?
1000            }
1001        }
1002
1003        Ok(())
1004    }
1005
1006    pub async fn handle_create_project_entry(
1007        this: Model<Self>,
1008        envelope: TypedEnvelope<proto::CreateProjectEntry>,
1009        mut cx: AsyncAppContext,
1010    ) -> Result<proto::ProjectEntryResponse> {
1011        let worktree = this.update(&mut cx, |this, cx| {
1012            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
1013            this.worktree_for_id(worktree_id, cx)
1014                .ok_or_else(|| anyhow!("worktree not found"))
1015        })??;
1016        Worktree::handle_create_entry(worktree, envelope.payload, cx).await
1017    }
1018
1019    pub async fn handle_rename_project_entry(
1020        this: Model<super::Project>,
1021        envelope: TypedEnvelope<proto::RenameProjectEntry>,
1022        mut cx: AsyncAppContext,
1023    ) -> Result<proto::ProjectEntryResponse> {
1024        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1025        let (worktree_id, worktree, old_path, is_dir) = this
1026            .update(&mut cx, |this, cx| {
1027                this.worktree_store
1028                    .read(cx)
1029                    .worktree_and_entry_for_id(entry_id, cx)
1030                    .map(|(worktree, entry)| {
1031                        (
1032                            worktree.read(cx).id(),
1033                            worktree,
1034                            entry.path.clone(),
1035                            entry.is_dir(),
1036                        )
1037                    })
1038            })?
1039            .ok_or_else(|| anyhow!("worktree not found"))?;
1040        let (old_abs_path, new_abs_path) = {
1041            let root_path = worktree.update(&mut cx, |this, _| this.abs_path())?;
1042            (
1043                root_path.join(&old_path),
1044                root_path.join(&envelope.payload.new_path),
1045            )
1046        };
1047        let lsp_store = this
1048            .update(&mut cx, |this, _| this.lsp_store())?
1049            .downgrade();
1050        LspStore::will_rename_entry(
1051            lsp_store,
1052            worktree_id,
1053            &old_abs_path,
1054            &new_abs_path,
1055            is_dir,
1056            cx.clone(),
1057        )
1058        .await;
1059        let response = Worktree::handle_rename_entry(worktree, envelope.payload, cx.clone()).await;
1060        this.update(&mut cx, |this, cx| {
1061            this.lsp_store().read(cx).did_rename_entry(
1062                worktree_id,
1063                &old_abs_path,
1064                &new_abs_path,
1065                is_dir,
1066            );
1067        })
1068        .ok();
1069        response
1070    }
1071
1072    pub async fn handle_copy_project_entry(
1073        this: Model<Self>,
1074        envelope: TypedEnvelope<proto::CopyProjectEntry>,
1075        mut cx: AsyncAppContext,
1076    ) -> Result<proto::ProjectEntryResponse> {
1077        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1078        let worktree = this.update(&mut cx, |this, cx| {
1079            this.worktree_for_entry(entry_id, cx)
1080                .ok_or_else(|| anyhow!("worktree not found"))
1081        })??;
1082        Worktree::handle_copy_entry(worktree, envelope.payload, cx).await
1083    }
1084
1085    pub async fn handle_delete_project_entry(
1086        this: Model<Self>,
1087        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
1088        mut cx: AsyncAppContext,
1089    ) -> Result<proto::ProjectEntryResponse> {
1090        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1091        let worktree = this.update(&mut cx, |this, cx| {
1092            this.worktree_for_entry(entry_id, cx)
1093                .ok_or_else(|| anyhow!("worktree not found"))
1094        })??;
1095        Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
1096    }
1097
1098    pub async fn handle_expand_project_entry(
1099        this: Model<Self>,
1100        envelope: TypedEnvelope<proto::ExpandProjectEntry>,
1101        mut cx: AsyncAppContext,
1102    ) -> Result<proto::ExpandProjectEntryResponse> {
1103        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1104        let worktree = this
1105            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1106            .ok_or_else(|| anyhow!("invalid request"))?;
1107        Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
1108    }
1109
1110    pub async fn handle_git_branches(
1111        this: Model<Self>,
1112        branches: TypedEnvelope<proto::GitBranches>,
1113        cx: AsyncAppContext,
1114    ) -> Result<proto::GitBranchesResponse> {
1115        let project_path = branches
1116            .payload
1117            .repository
1118            .clone()
1119            .context("Invalid GitBranches call")?;
1120        let project_path = ProjectPath {
1121            worktree_id: WorktreeId::from_proto(project_path.worktree_id),
1122            path: Path::new(&project_path.path).into(),
1123        };
1124
1125        let branches = this
1126            .read_with(&cx, |this, cx| this.branches(project_path, cx))?
1127            .await?;
1128
1129        Ok(proto::GitBranchesResponse {
1130            branches: branches
1131                .into_iter()
1132                .map(|branch| proto::Branch {
1133                    is_head: branch.is_head,
1134                    name: branch.name.to_string(),
1135                    unix_timestamp: branch.unix_timestamp.map(|timestamp| timestamp as u64),
1136                })
1137                .collect(),
1138        })
1139    }
1140
1141    pub async fn handle_update_branch(
1142        this: Model<Self>,
1143        update_branch: TypedEnvelope<proto::UpdateGitBranch>,
1144        cx: AsyncAppContext,
1145    ) -> Result<proto::Ack> {
1146        let project_path = update_branch
1147            .payload
1148            .repository
1149            .clone()
1150            .context("Invalid GitBranches call")?;
1151        let project_path = ProjectPath {
1152            worktree_id: WorktreeId::from_proto(project_path.worktree_id),
1153            path: Path::new(&project_path.path).into(),
1154        };
1155        let new_branch = update_branch.payload.branch_name;
1156
1157        this.read_with(&cx, |this, cx| {
1158            this.update_or_create_branch(project_path, new_branch, cx)
1159        })?
1160        .await?;
1161
1162        Ok(proto::Ack {})
1163    }
1164}
1165
1166#[derive(Clone, Debug)]
1167enum WorktreeHandle {
1168    Strong(Model<Worktree>),
1169    Weak(WeakModel<Worktree>),
1170}
1171
1172impl WorktreeHandle {
1173    fn upgrade(&self) -> Option<Model<Worktree>> {
1174        match self {
1175            WorktreeHandle::Strong(handle) => Some(handle.clone()),
1176            WorktreeHandle::Weak(handle) => handle.upgrade(),
1177        }
1178    }
1179}