worktree_store.rs

   1use std::{
   2    path::{Path, PathBuf},
   3    sync::{atomic::AtomicUsize, Arc},
   4};
   5
   6use anyhow::{anyhow, Context as _, Result};
   7use collections::{HashMap, HashSet};
   8use fs::Fs;
   9use futures::{
  10    future::{BoxFuture, Shared},
  11    FutureExt, SinkExt,
  12};
  13use gpui::{
  14    AppContext, AsyncAppContext, EntityId, EventEmitter, Model, ModelContext, Task, WeakModel,
  15};
  16use postage::oneshot;
  17use rpc::{
  18    proto::{self, SSH_PROJECT_ID},
  19    AnyProtoClient, ErrorExt, TypedEnvelope,
  20};
  21use smol::{
  22    channel::{Receiver, Sender},
  23    stream::StreamExt,
  24};
  25use text::ReplicaId;
  26use util::{paths::compare_paths, ResultExt};
  27use worktree::{Entry, ProjectEntryId, Worktree, WorktreeId, WorktreeSettings};
  28
  29use crate::{search::SearchQuery, ProjectPath};
  30
  31struct MatchingEntry {
  32    worktree_path: Arc<Path>,
  33    path: ProjectPath,
  34    respond: oneshot::Sender<ProjectPath>,
  35}
  36
  37enum WorktreeStoreState {
  38    Local {
  39        fs: Arc<dyn Fs>,
  40    },
  41    Remote {
  42        upstream_client: AnyProtoClient,
  43        upstream_project_id: u64,
  44    },
  45}
  46
  47pub struct WorktreeStore {
  48    next_entry_id: Arc<AtomicUsize>,
  49    downstream_client: Option<(AnyProtoClient, u64)>,
  50    retain_worktrees: bool,
  51    worktrees: Vec<WorktreeHandle>,
  52    worktrees_reordered: bool,
  53    #[allow(clippy::type_complexity)]
  54    loading_worktrees:
  55        HashMap<Arc<Path>, Shared<Task<Result<Model<Worktree>, Arc<anyhow::Error>>>>>,
  56    state: WorktreeStoreState,
  57}
  58
  59pub enum WorktreeStoreEvent {
  60    WorktreeAdded(Model<Worktree>),
  61    WorktreeRemoved(EntityId, WorktreeId),
  62    WorktreeReleased(EntityId, WorktreeId),
  63    WorktreeOrderChanged,
  64    WorktreeUpdateSent(Model<Worktree>),
  65}
  66
  67impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
  68
  69impl WorktreeStore {
  70    pub fn init(client: &AnyProtoClient) {
  71        client.add_model_request_handler(Self::handle_create_project_entry);
  72        client.add_model_request_handler(Self::handle_rename_project_entry);
  73        client.add_model_request_handler(Self::handle_copy_project_entry);
  74        client.add_model_request_handler(Self::handle_delete_project_entry);
  75        client.add_model_request_handler(Self::handle_expand_project_entry);
  76        client.add_model_request_handler(Self::handle_git_branches);
  77        client.add_model_request_handler(Self::handle_update_branch);
  78    }
  79
  80    pub fn local(retain_worktrees: bool, fs: Arc<dyn Fs>) -> Self {
  81        Self {
  82            next_entry_id: Default::default(),
  83            loading_worktrees: Default::default(),
  84            downstream_client: None,
  85            worktrees: Vec::new(),
  86            worktrees_reordered: false,
  87            retain_worktrees,
  88            state: WorktreeStoreState::Local { fs },
  89        }
  90    }
  91
  92    pub fn remote(
  93        retain_worktrees: bool,
  94        upstream_client: AnyProtoClient,
  95        upstream_project_id: u64,
  96    ) -> Self {
  97        Self {
  98            next_entry_id: Default::default(),
  99            loading_worktrees: Default::default(),
 100            downstream_client: None,
 101            worktrees: Vec::new(),
 102            worktrees_reordered: false,
 103            retain_worktrees,
 104            state: WorktreeStoreState::Remote {
 105                upstream_client,
 106                upstream_project_id,
 107            },
 108        }
 109    }
 110
 111    /// Iterates through all worktrees, including ones that don't appear in the project panel
 112    pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Model<Worktree>> {
 113        self.worktrees
 114            .iter()
 115            .filter_map(move |worktree| worktree.upgrade())
 116    }
 117
 118    /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
 119    pub fn visible_worktrees<'a>(
 120        &'a self,
 121        cx: &'a AppContext,
 122    ) -> impl 'a + DoubleEndedIterator<Item = Model<Worktree>> {
 123        self.worktrees()
 124            .filter(|worktree| worktree.read(cx).is_visible())
 125    }
 126
 127    pub fn worktree_for_id(&self, id: WorktreeId, cx: &AppContext) -> Option<Model<Worktree>> {
 128        self.worktrees()
 129            .find(|worktree| worktree.read(cx).id() == id)
 130    }
 131
 132    pub fn current_branch(&self, repository: ProjectPath, cx: &AppContext) -> Option<Arc<str>> {
 133        self.worktree_for_id(repository.worktree_id, cx)?
 134            .read(cx)
 135            .git_entry(repository.path)?
 136            .branch()
 137    }
 138
 139    pub fn worktree_for_entry(
 140        &self,
 141        entry_id: ProjectEntryId,
 142        cx: &AppContext,
 143    ) -> Option<Model<Worktree>> {
 144        self.worktrees()
 145            .find(|worktree| worktree.read(cx).contains_entry(entry_id))
 146    }
 147
 148    pub fn find_worktree(
 149        &self,
 150        abs_path: &Path,
 151        cx: &AppContext,
 152    ) -> Option<(Model<Worktree>, PathBuf)> {
 153        for tree in self.worktrees() {
 154            if let Ok(relative_path) = abs_path.strip_prefix(tree.read(cx).abs_path()) {
 155                return Some((tree.clone(), relative_path.into()));
 156            }
 157        }
 158        None
 159    }
 160
 161    pub fn find_or_create_worktree(
 162        &mut self,
 163        abs_path: impl AsRef<Path>,
 164        visible: bool,
 165        cx: &mut ModelContext<Self>,
 166    ) -> Task<Result<(Model<Worktree>, PathBuf)>> {
 167        let abs_path = abs_path.as_ref();
 168        if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
 169            Task::ready(Ok((tree, relative_path)))
 170        } else {
 171            let worktree = self.create_worktree(abs_path, visible, cx);
 172            cx.background_executor()
 173                .spawn(async move { Ok((worktree.await?, PathBuf::new())) })
 174        }
 175    }
 176
 177    pub fn entry_for_id<'a>(
 178        &'a self,
 179        entry_id: ProjectEntryId,
 180        cx: &'a AppContext,
 181    ) -> Option<&'a Entry> {
 182        self.worktrees()
 183            .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
 184    }
 185
 186    pub fn entry_for_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Entry> {
 187        self.worktree_for_id(path.worktree_id, cx)?
 188            .read(cx)
 189            .entry_for_path(&path.path)
 190            .cloned()
 191    }
 192
 193    pub fn create_worktree(
 194        &mut self,
 195        abs_path: impl AsRef<Path>,
 196        visible: bool,
 197        cx: &mut ModelContext<Self>,
 198    ) -> Task<Result<Model<Worktree>>> {
 199        let path: Arc<Path> = abs_path.as_ref().into();
 200        if !self.loading_worktrees.contains_key(&path) {
 201            let task = match &self.state {
 202                WorktreeStoreState::Remote {
 203                    upstream_client, ..
 204                } => {
 205                    if upstream_client.is_via_collab() {
 206                        Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
 207                    } else {
 208                        self.create_ssh_worktree(upstream_client.clone(), abs_path, visible, cx)
 209                    }
 210                }
 211                WorktreeStoreState::Local { fs } => {
 212                    self.create_local_worktree(fs.clone(), abs_path, visible, cx)
 213                }
 214            };
 215
 216            self.loading_worktrees.insert(path.clone(), task.shared());
 217        }
 218        let task = self.loading_worktrees.get(&path).unwrap().clone();
 219        cx.spawn(|this, mut cx| async move {
 220            let result = task.await;
 221            this.update(&mut cx, |this, _| this.loading_worktrees.remove(&path))
 222                .ok();
 223            match result {
 224                Ok(worktree) => Ok(worktree),
 225                Err(err) => Err((*err).cloned()),
 226            }
 227        })
 228    }
 229
 230    fn create_ssh_worktree(
 231        &mut self,
 232        client: AnyProtoClient,
 233        abs_path: impl AsRef<Path>,
 234        visible: bool,
 235        cx: &mut ModelContext<Self>,
 236    ) -> Task<Result<Model<Worktree>, Arc<anyhow::Error>>> {
 237        let path_key: Arc<Path> = abs_path.as_ref().into();
 238        let mut abs_path = path_key.clone().to_string_lossy().to_string();
 239        // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
 240        // in which case want to strip the leading the `/`.
 241        // On the host-side, the `~` will get expanded.
 242        // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
 243        if abs_path.starts_with("/~") {
 244            abs_path = abs_path[1..].to_string();
 245        }
 246        if abs_path.is_empty() || abs_path == "/" {
 247            abs_path = "~/".to_string();
 248        }
 249        cx.spawn(|this, mut cx| async move {
 250            let this = this.upgrade().context("Dropped worktree store")?;
 251
 252            let response = client
 253                .request(proto::AddWorktree {
 254                    project_id: SSH_PROJECT_ID,
 255                    path: abs_path.clone(),
 256                    visible,
 257                })
 258                .await?;
 259
 260            if let Some(existing_worktree) = this.read_with(&cx, |this, cx| {
 261                this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
 262            })? {
 263                return Ok(existing_worktree);
 264            }
 265
 266            let root_name = PathBuf::from(&response.canonicalized_path)
 267                .file_name()
 268                .map(|n| n.to_string_lossy().to_string())
 269                .unwrap_or(response.canonicalized_path.to_string());
 270
 271            let worktree = cx.update(|cx| {
 272                Worktree::remote(
 273                    SSH_PROJECT_ID,
 274                    0,
 275                    proto::WorktreeMetadata {
 276                        id: response.worktree_id,
 277                        root_name,
 278                        visible,
 279                        abs_path: response.canonicalized_path,
 280                    },
 281                    client,
 282                    cx,
 283                )
 284            })?;
 285
 286            this.update(&mut cx, |this, cx| {
 287                this.add(&worktree, cx);
 288            })?;
 289            Ok(worktree)
 290        })
 291    }
 292
 293    fn create_local_worktree(
 294        &mut self,
 295        fs: Arc<dyn Fs>,
 296        abs_path: impl AsRef<Path>,
 297        visible: bool,
 298        cx: &mut ModelContext<Self>,
 299    ) -> Task<Result<Model<Worktree>, Arc<anyhow::Error>>> {
 300        let next_entry_id = self.next_entry_id.clone();
 301        let path: Arc<Path> = abs_path.as_ref().into();
 302
 303        cx.spawn(move |this, mut cx| async move {
 304            let worktree = Worktree::local(path.clone(), visible, fs, next_entry_id, &mut cx).await;
 305
 306            let worktree = worktree?;
 307            this.update(&mut cx, |this, cx| this.add(&worktree, cx))?;
 308
 309            if visible {
 310                cx.update(|cx| {
 311                    cx.add_recent_document(&path);
 312                })
 313                .log_err();
 314            }
 315
 316            Ok(worktree)
 317        })
 318    }
 319
 320    pub fn add(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
 321        let worktree_id = worktree.read(cx).id();
 322        debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
 323
 324        let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
 325        let handle = if push_strong_handle {
 326            WorktreeHandle::Strong(worktree.clone())
 327        } else {
 328            WorktreeHandle::Weak(worktree.downgrade())
 329        };
 330        if self.worktrees_reordered {
 331            self.worktrees.push(handle);
 332        } else {
 333            let i = match self
 334                .worktrees
 335                .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
 336                    other.upgrade().map(|worktree| worktree.read(cx).abs_path())
 337                }) {
 338                Ok(i) | Err(i) => i,
 339            };
 340            self.worktrees.insert(i, handle);
 341        }
 342
 343        cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
 344        self.send_project_updates(cx);
 345
 346        let handle_id = worktree.entity_id();
 347        cx.observe_release(worktree, move |this, worktree, cx| {
 348            cx.emit(WorktreeStoreEvent::WorktreeReleased(
 349                handle_id,
 350                worktree.id(),
 351            ));
 352            cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 353                handle_id,
 354                worktree.id(),
 355            ));
 356            this.send_project_updates(cx);
 357        })
 358        .detach();
 359    }
 360
 361    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut ModelContext<Self>) {
 362        self.worktrees.retain(|worktree| {
 363            if let Some(worktree) = worktree.upgrade() {
 364                if worktree.read(cx).id() == id_to_remove {
 365                    cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 366                        worktree.entity_id(),
 367                        id_to_remove,
 368                    ));
 369                    false
 370                } else {
 371                    true
 372                }
 373            } else {
 374                false
 375            }
 376        });
 377        self.send_project_updates(cx);
 378    }
 379
 380    pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
 381        self.worktrees_reordered = worktrees_reordered;
 382    }
 383
 384    fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
 385        match &self.state {
 386            WorktreeStoreState::Remote {
 387                upstream_client,
 388                upstream_project_id,
 389                ..
 390            } => Some((upstream_client.clone(), *upstream_project_id)),
 391            WorktreeStoreState::Local { .. } => None,
 392        }
 393    }
 394
 395    pub fn set_worktrees_from_proto(
 396        &mut self,
 397        worktrees: Vec<proto::WorktreeMetadata>,
 398        replica_id: ReplicaId,
 399        cx: &mut ModelContext<Self>,
 400    ) -> Result<()> {
 401        let mut old_worktrees_by_id = self
 402            .worktrees
 403            .drain(..)
 404            .filter_map(|worktree| {
 405                let worktree = worktree.upgrade()?;
 406                Some((worktree.read(cx).id(), worktree))
 407            })
 408            .collect::<HashMap<_, _>>();
 409
 410        let (client, project_id) = self
 411            .upstream_client()
 412            .clone()
 413            .ok_or_else(|| anyhow!("invalid project"))?;
 414
 415        for worktree in worktrees {
 416            if let Some(old_worktree) =
 417                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
 418            {
 419                let push_strong_handle =
 420                    self.retain_worktrees || old_worktree.read(cx).is_visible();
 421                let handle = if push_strong_handle {
 422                    WorktreeHandle::Strong(old_worktree.clone())
 423                } else {
 424                    WorktreeHandle::Weak(old_worktree.downgrade())
 425                };
 426                self.worktrees.push(handle);
 427            } else {
 428                self.add(
 429                    &Worktree::remote(project_id, replica_id, worktree, client.clone(), cx),
 430                    cx,
 431                );
 432            }
 433        }
 434        self.send_project_updates(cx);
 435
 436        Ok(())
 437    }
 438
 439    pub fn move_worktree(
 440        &mut self,
 441        source: WorktreeId,
 442        destination: WorktreeId,
 443        cx: &mut ModelContext<Self>,
 444    ) -> Result<()> {
 445        if source == destination {
 446            return Ok(());
 447        }
 448
 449        let mut source_index = None;
 450        let mut destination_index = None;
 451        for (i, worktree) in self.worktrees.iter().enumerate() {
 452            if let Some(worktree) = worktree.upgrade() {
 453                let worktree_id = worktree.read(cx).id();
 454                if worktree_id == source {
 455                    source_index = Some(i);
 456                    if destination_index.is_some() {
 457                        break;
 458                    }
 459                } else if worktree_id == destination {
 460                    destination_index = Some(i);
 461                    if source_index.is_some() {
 462                        break;
 463                    }
 464                }
 465            }
 466        }
 467
 468        let source_index =
 469            source_index.with_context(|| format!("Missing worktree for id {source}"))?;
 470        let destination_index =
 471            destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
 472
 473        if source_index == destination_index {
 474            return Ok(());
 475        }
 476
 477        let worktree_to_move = self.worktrees.remove(source_index);
 478        self.worktrees.insert(destination_index, worktree_to_move);
 479        self.worktrees_reordered = true;
 480        cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
 481        cx.notify();
 482        Ok(())
 483    }
 484
 485    pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
 486        for worktree in &self.worktrees {
 487            if let Some(worktree) = worktree.upgrade() {
 488                worktree.update(cx, |worktree, _| {
 489                    if let Some(worktree) = worktree.as_remote_mut() {
 490                        worktree.disconnected_from_host();
 491                    }
 492                });
 493            }
 494        }
 495    }
 496
 497    pub fn send_project_updates(&mut self, cx: &mut ModelContext<Self>) {
 498        let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
 499            return;
 500        };
 501
 502        let update = proto::UpdateProject {
 503            project_id,
 504            worktrees: self.worktree_metadata_protos(cx),
 505        };
 506
 507        // collab has bad concurrency guarantees, so we send requests in serial.
 508        let update_project = if downstream_client.is_via_collab() {
 509            Some(downstream_client.request(update))
 510        } else {
 511            downstream_client.send(update).log_err();
 512            None
 513        };
 514        cx.spawn(|this, mut cx| async move {
 515            if let Some(update_project) = update_project {
 516                update_project.await?;
 517            }
 518
 519            this.update(&mut cx, |this, cx| {
 520                let worktrees = this.worktrees().collect::<Vec<_>>();
 521
 522                for worktree in worktrees {
 523                    worktree.update(cx, |worktree, cx| {
 524                        let client = downstream_client.clone();
 525                        worktree.observe_updates(project_id, cx, {
 526                            move |update| {
 527                                let client = client.clone();
 528                                async move {
 529                                    if client.is_via_collab() {
 530                                        client
 531                                            .request(update)
 532                                            .map(|result| result.log_err().is_some())
 533                                            .await
 534                                    } else {
 535                                        client.send(update).log_err().is_some()
 536                                    }
 537                                }
 538                            }
 539                        });
 540                    });
 541
 542                    cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
 543                }
 544
 545                anyhow::Ok(())
 546            })
 547        })
 548        .detach_and_log_err(cx);
 549    }
 550
 551    pub fn worktree_metadata_protos(&self, cx: &AppContext) -> Vec<proto::WorktreeMetadata> {
 552        self.worktrees()
 553            .map(|worktree| {
 554                let worktree = worktree.read(cx);
 555                proto::WorktreeMetadata {
 556                    id: worktree.id().to_proto(),
 557                    root_name: worktree.root_name().into(),
 558                    visible: worktree.is_visible(),
 559                    abs_path: worktree.abs_path().to_string_lossy().into(),
 560                }
 561            })
 562            .collect()
 563    }
 564
 565    pub fn shared(
 566        &mut self,
 567        remote_id: u64,
 568        downsteam_client: AnyProtoClient,
 569        cx: &mut ModelContext<Self>,
 570    ) {
 571        self.retain_worktrees = true;
 572        self.downstream_client = Some((downsteam_client, remote_id));
 573
 574        // When shared, retain all worktrees
 575        for worktree_handle in self.worktrees.iter_mut() {
 576            match worktree_handle {
 577                WorktreeHandle::Strong(_) => {}
 578                WorktreeHandle::Weak(worktree) => {
 579                    if let Some(worktree) = worktree.upgrade() {
 580                        *worktree_handle = WorktreeHandle::Strong(worktree);
 581                    }
 582                }
 583            }
 584        }
 585        self.send_project_updates(cx);
 586    }
 587
 588    pub fn unshared(&mut self, cx: &mut ModelContext<Self>) {
 589        self.retain_worktrees = false;
 590        self.downstream_client.take();
 591
 592        // When not shared, only retain the visible worktrees
 593        for worktree_handle in self.worktrees.iter_mut() {
 594            if let WorktreeHandle::Strong(worktree) = worktree_handle {
 595                let is_visible = worktree.update(cx, |worktree, _| {
 596                    worktree.stop_observing_updates();
 597                    worktree.is_visible()
 598                });
 599                if !is_visible {
 600                    *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
 601                }
 602            }
 603        }
 604    }
 605
 606    /// search over all worktrees and return buffers that *might* match the search.
 607    pub fn find_search_candidates(
 608        &self,
 609        query: SearchQuery,
 610        limit: usize,
 611        open_entries: HashSet<ProjectEntryId>,
 612        fs: Arc<dyn Fs>,
 613        cx: &ModelContext<Self>,
 614    ) -> Receiver<ProjectPath> {
 615        let snapshots = self
 616            .visible_worktrees(cx)
 617            .filter_map(|tree| {
 618                let tree = tree.read(cx);
 619                Some((tree.snapshot(), tree.as_local()?.settings()))
 620            })
 621            .collect::<Vec<_>>();
 622
 623        let executor = cx.background_executor().clone();
 624
 625        // We want to return entries in the order they are in the worktrees, so we have one
 626        // thread that iterates over the worktrees (and ignored directories) as necessary,
 627        // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
 628        // channel.
 629        // We spawn a number of workers that take items from the filter channel and check the query
 630        // against the version of the file on disk.
 631        let (filter_tx, filter_rx) = smol::channel::bounded(64);
 632        let (output_tx, mut output_rx) = smol::channel::bounded(64);
 633        let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
 634
 635        let input = cx.background_executor().spawn({
 636            let fs = fs.clone();
 637            let query = query.clone();
 638            async move {
 639                Self::find_candidate_paths(
 640                    fs,
 641                    snapshots,
 642                    open_entries,
 643                    query,
 644                    filter_tx,
 645                    output_tx,
 646                )
 647                .await
 648                .log_err();
 649            }
 650        });
 651        const MAX_CONCURRENT_FILE_SCANS: usize = 64;
 652        let filters = cx.background_executor().spawn(async move {
 653            let fs = &fs;
 654            let query = &query;
 655            executor
 656                .scoped(move |scope| {
 657                    for _ in 0..MAX_CONCURRENT_FILE_SCANS {
 658                        let filter_rx = filter_rx.clone();
 659                        scope.spawn(async move {
 660                            Self::filter_paths(fs, filter_rx, query).await.log_err();
 661                        })
 662                    }
 663                })
 664                .await;
 665        });
 666        cx.background_executor()
 667            .spawn(async move {
 668                let mut matched = 0;
 669                while let Some(mut receiver) = output_rx.next().await {
 670                    let Some(path) = receiver.next().await else {
 671                        continue;
 672                    };
 673                    let Ok(_) = matching_paths_tx.send(path).await else {
 674                        break;
 675                    };
 676                    matched += 1;
 677                    if matched == limit {
 678                        break;
 679                    }
 680                }
 681                drop(input);
 682                drop(filters);
 683            })
 684            .detach();
 685        matching_paths_rx
 686    }
 687
 688    fn scan_ignored_dir<'a>(
 689        fs: &'a Arc<dyn Fs>,
 690        snapshot: &'a worktree::Snapshot,
 691        path: &'a Path,
 692        query: &'a SearchQuery,
 693        include_root: bool,
 694        filter_tx: &'a Sender<MatchingEntry>,
 695        output_tx: &'a Sender<oneshot::Receiver<ProjectPath>>,
 696    ) -> BoxFuture<'a, Result<()>> {
 697        async move {
 698            let abs_path = snapshot.abs_path().join(path);
 699            let Some(mut files) = fs
 700                .read_dir(&abs_path)
 701                .await
 702                .with_context(|| format!("listing ignored path {abs_path:?}"))
 703                .log_err()
 704            else {
 705                return Ok(());
 706            };
 707
 708            let mut results = Vec::new();
 709
 710            while let Some(Ok(file)) = files.next().await {
 711                let Some(metadata) = fs
 712                    .metadata(&file)
 713                    .await
 714                    .with_context(|| format!("fetching fs metadata for {abs_path:?}"))
 715                    .log_err()
 716                    .flatten()
 717                else {
 718                    continue;
 719                };
 720                if metadata.is_symlink || metadata.is_fifo {
 721                    continue;
 722                }
 723                results.push((
 724                    file.strip_prefix(snapshot.abs_path())?.to_path_buf(),
 725                    !metadata.is_dir,
 726                ))
 727            }
 728            results.sort_by(|(a_path, a_is_file), (b_path, b_is_file)| {
 729                compare_paths((a_path, *a_is_file), (b_path, *b_is_file))
 730            });
 731            for (path, is_file) in results {
 732                if is_file {
 733                    if query.filters_path() {
 734                        let matched_path = if include_root {
 735                            let mut full_path = PathBuf::from(snapshot.root_name());
 736                            full_path.push(&path);
 737                            query.file_matches(&full_path)
 738                        } else {
 739                            query.file_matches(&path)
 740                        };
 741                        if !matched_path {
 742                            continue;
 743                        }
 744                    }
 745                    let (tx, rx) = oneshot::channel();
 746                    output_tx.send(rx).await?;
 747                    filter_tx
 748                        .send(MatchingEntry {
 749                            respond: tx,
 750                            worktree_path: snapshot.abs_path().clone(),
 751                            path: ProjectPath {
 752                                worktree_id: snapshot.id(),
 753                                path: Arc::from(path),
 754                            },
 755                        })
 756                        .await?;
 757                } else {
 758                    Self::scan_ignored_dir(
 759                        fs,
 760                        snapshot,
 761                        &path,
 762                        query,
 763                        include_root,
 764                        filter_tx,
 765                        output_tx,
 766                    )
 767                    .await?;
 768                }
 769            }
 770            Ok(())
 771        }
 772        .boxed()
 773    }
 774
 775    async fn find_candidate_paths(
 776        fs: Arc<dyn Fs>,
 777        snapshots: Vec<(worktree::Snapshot, WorktreeSettings)>,
 778        open_entries: HashSet<ProjectEntryId>,
 779        query: SearchQuery,
 780        filter_tx: Sender<MatchingEntry>,
 781        output_tx: Sender<oneshot::Receiver<ProjectPath>>,
 782    ) -> Result<()> {
 783        let include_root = snapshots.len() > 1;
 784        for (snapshot, settings) in snapshots {
 785            let mut entries: Vec<_> = snapshot.entries(query.include_ignored(), 0).collect();
 786            entries.sort_by(|a, b| compare_paths((&a.path, a.is_file()), (&b.path, b.is_file())));
 787            for entry in entries {
 788                if entry.is_dir() && entry.is_ignored {
 789                    if !settings.is_path_excluded(&entry.path) {
 790                        Self::scan_ignored_dir(
 791                            &fs,
 792                            &snapshot,
 793                            &entry.path,
 794                            &query,
 795                            include_root,
 796                            &filter_tx,
 797                            &output_tx,
 798                        )
 799                        .await?;
 800                    }
 801                    continue;
 802                }
 803
 804                if entry.is_fifo || !entry.is_file() {
 805                    continue;
 806                }
 807
 808                if query.filters_path() {
 809                    let matched_path = if include_root {
 810                        let mut full_path = PathBuf::from(snapshot.root_name());
 811                        full_path.push(&entry.path);
 812                        query.file_matches(&full_path)
 813                    } else {
 814                        query.file_matches(&entry.path)
 815                    };
 816                    if !matched_path {
 817                        continue;
 818                    }
 819                }
 820
 821                let (mut tx, rx) = oneshot::channel();
 822
 823                if open_entries.contains(&entry.id) {
 824                    tx.send(ProjectPath {
 825                        worktree_id: snapshot.id(),
 826                        path: entry.path.clone(),
 827                    })
 828                    .await?;
 829                } else {
 830                    filter_tx
 831                        .send(MatchingEntry {
 832                            respond: tx,
 833                            worktree_path: snapshot.abs_path().clone(),
 834                            path: ProjectPath {
 835                                worktree_id: snapshot.id(),
 836                                path: entry.path.clone(),
 837                            },
 838                        })
 839                        .await?;
 840                }
 841
 842                output_tx.send(rx).await?;
 843            }
 844        }
 845        Ok(())
 846    }
 847
 848    pub fn branches(
 849        &self,
 850        project_path: ProjectPath,
 851        cx: &AppContext,
 852    ) -> Task<Result<Vec<git::repository::Branch>>> {
 853        let Some(worktree) = self.worktree_for_id(project_path.worktree_id, cx) else {
 854            return Task::ready(Err(anyhow!("No worktree found for ProjectPath")));
 855        };
 856
 857        match worktree.read(cx) {
 858            Worktree::Local(local_worktree) => {
 859                let branches = util::maybe!({
 860                    let worktree_error = |error| {
 861                        format!(
 862                            "{} for worktree {}",
 863                            error,
 864                            local_worktree.abs_path().to_string_lossy()
 865                        )
 866                    };
 867
 868                    let entry = local_worktree
 869                        .git_entry(project_path.path)
 870                        .with_context(|| worktree_error("No git entry found"))?;
 871
 872                    let repo = local_worktree
 873                        .get_local_repo(&entry)
 874                        .with_context(|| worktree_error("No repository found"))?
 875                        .repo()
 876                        .clone();
 877
 878                    repo.branches()
 879                });
 880
 881                Task::ready(branches)
 882            }
 883            Worktree::Remote(remote_worktree) => {
 884                let request = remote_worktree.client().request(proto::GitBranches {
 885                    project_id: remote_worktree.project_id(),
 886                    repository: Some(proto::ProjectPath {
 887                        worktree_id: project_path.worktree_id.to_proto(),
 888                        path: project_path.path.to_string_lossy().to_string(), // Root path
 889                    }),
 890                });
 891
 892                cx.background_executor().spawn(async move {
 893                    let response = request.await?;
 894
 895                    let branches = response
 896                        .branches
 897                        .into_iter()
 898                        .map(|proto_branch| git::repository::Branch {
 899                            is_head: proto_branch.is_head,
 900                            name: proto_branch.name.into(),
 901                            unix_timestamp: proto_branch
 902                                .unix_timestamp
 903                                .map(|timestamp| timestamp as i64),
 904                        })
 905                        .collect();
 906
 907                    Ok(branches)
 908                })
 909            }
 910        }
 911    }
 912
 913    pub fn update_or_create_branch(
 914        &self,
 915        repository: ProjectPath,
 916        new_branch: String,
 917        cx: &AppContext,
 918    ) -> Task<Result<()>> {
 919        let Some(worktree) = self.worktree_for_id(repository.worktree_id, cx) else {
 920            return Task::ready(Err(anyhow!("No worktree found for ProjectPath")));
 921        };
 922
 923        match worktree.read(cx) {
 924            Worktree::Local(local_worktree) => {
 925                let result = util::maybe!({
 926                    let worktree_error = |error| {
 927                        format!(
 928                            "{} for worktree {}",
 929                            error,
 930                            local_worktree.abs_path().to_string_lossy()
 931                        )
 932                    };
 933
 934                    let entry = local_worktree
 935                        .git_entry(repository.path)
 936                        .with_context(|| worktree_error("No git entry found"))?;
 937
 938                    let repo = local_worktree
 939                        .get_local_repo(&entry)
 940                        .with_context(|| worktree_error("No repository found"))?
 941                        .repo()
 942                        .clone();
 943
 944                    if !repo.branch_exits(&new_branch)? {
 945                        repo.create_branch(&new_branch)?;
 946                    }
 947
 948                    repo.change_branch(&new_branch)?;
 949
 950                    Ok(())
 951                });
 952
 953                Task::ready(result)
 954            }
 955            Worktree::Remote(remote_worktree) => {
 956                let request = remote_worktree.client().request(proto::UpdateGitBranch {
 957                    project_id: remote_worktree.project_id(),
 958                    repository: Some(proto::ProjectPath {
 959                        worktree_id: repository.worktree_id.to_proto(),
 960                        path: repository.path.to_string_lossy().to_string(), // Root path
 961                    }),
 962                    branch_name: new_branch,
 963                });
 964
 965                cx.background_executor().spawn(async move {
 966                    request.await?;
 967                    Ok(())
 968                })
 969            }
 970        }
 971    }
 972
 973    async fn filter_paths(
 974        fs: &Arc<dyn Fs>,
 975        mut input: Receiver<MatchingEntry>,
 976        query: &SearchQuery,
 977    ) -> Result<()> {
 978        while let Some(mut entry) = input.next().await {
 979            let abs_path = entry.worktree_path.join(&entry.path.path);
 980            let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
 981                continue;
 982            };
 983            if query.detect(file).unwrap_or(false) {
 984                entry.respond.send(entry.path).await?
 985            }
 986        }
 987
 988        Ok(())
 989    }
 990
 991    pub async fn handle_create_project_entry(
 992        this: Model<Self>,
 993        envelope: TypedEnvelope<proto::CreateProjectEntry>,
 994        mut cx: AsyncAppContext,
 995    ) -> Result<proto::ProjectEntryResponse> {
 996        let worktree = this.update(&mut cx, |this, cx| {
 997            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
 998            this.worktree_for_id(worktree_id, cx)
 999                .ok_or_else(|| anyhow!("worktree not found"))
1000        })??;
1001        Worktree::handle_create_entry(worktree, envelope.payload, cx).await
1002    }
1003
1004    pub async fn handle_rename_project_entry(
1005        this: Model<Self>,
1006        envelope: TypedEnvelope<proto::RenameProjectEntry>,
1007        mut cx: AsyncAppContext,
1008    ) -> Result<proto::ProjectEntryResponse> {
1009        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1010        let worktree = this.update(&mut cx, |this, cx| {
1011            this.worktree_for_entry(entry_id, cx)
1012                .ok_or_else(|| anyhow!("worktree not found"))
1013        })??;
1014        Worktree::handle_rename_entry(worktree, envelope.payload, cx).await
1015    }
1016
1017    pub async fn handle_copy_project_entry(
1018        this: Model<Self>,
1019        envelope: TypedEnvelope<proto::CopyProjectEntry>,
1020        mut cx: AsyncAppContext,
1021    ) -> Result<proto::ProjectEntryResponse> {
1022        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1023        let worktree = this.update(&mut cx, |this, cx| {
1024            this.worktree_for_entry(entry_id, cx)
1025                .ok_or_else(|| anyhow!("worktree not found"))
1026        })??;
1027        Worktree::handle_copy_entry(worktree, envelope.payload, cx).await
1028    }
1029
1030    pub async fn handle_delete_project_entry(
1031        this: Model<Self>,
1032        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
1033        mut cx: AsyncAppContext,
1034    ) -> Result<proto::ProjectEntryResponse> {
1035        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1036        let worktree = this.update(&mut cx, |this, cx| {
1037            this.worktree_for_entry(entry_id, cx)
1038                .ok_or_else(|| anyhow!("worktree not found"))
1039        })??;
1040        Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
1041    }
1042
1043    pub async fn handle_expand_project_entry(
1044        this: Model<Self>,
1045        envelope: TypedEnvelope<proto::ExpandProjectEntry>,
1046        mut cx: AsyncAppContext,
1047    ) -> Result<proto::ExpandProjectEntryResponse> {
1048        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1049        let worktree = this
1050            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1051            .ok_or_else(|| anyhow!("invalid request"))?;
1052        Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
1053    }
1054
1055    pub async fn handle_git_branches(
1056        this: Model<Self>,
1057        branches: TypedEnvelope<proto::GitBranches>,
1058        cx: AsyncAppContext,
1059    ) -> Result<proto::GitBranchesResponse> {
1060        let project_path = branches
1061            .payload
1062            .repository
1063            .clone()
1064            .context("Invalid GitBranches call")?;
1065        let project_path = ProjectPath {
1066            worktree_id: WorktreeId::from_proto(project_path.worktree_id),
1067            path: Path::new(&project_path.path).into(),
1068        };
1069
1070        let branches = this
1071            .read_with(&cx, |this, cx| this.branches(project_path, cx))?
1072            .await?;
1073
1074        Ok(proto::GitBranchesResponse {
1075            branches: branches
1076                .into_iter()
1077                .map(|branch| proto::Branch {
1078                    is_head: branch.is_head,
1079                    name: branch.name.to_string(),
1080                    unix_timestamp: branch.unix_timestamp.map(|timestamp| timestamp as u64),
1081                })
1082                .collect(),
1083        })
1084    }
1085
1086    pub async fn handle_update_branch(
1087        this: Model<Self>,
1088        update_branch: TypedEnvelope<proto::UpdateGitBranch>,
1089        cx: AsyncAppContext,
1090    ) -> Result<proto::Ack> {
1091        let project_path = update_branch
1092            .payload
1093            .repository
1094            .clone()
1095            .context("Invalid GitBranches call")?;
1096        let project_path = ProjectPath {
1097            worktree_id: WorktreeId::from_proto(project_path.worktree_id),
1098            path: Path::new(&project_path.path).into(),
1099        };
1100        let new_branch = update_branch.payload.branch_name;
1101
1102        this.read_with(&cx, |this, cx| {
1103            this.update_or_create_branch(project_path, new_branch, cx)
1104        })?
1105        .await?;
1106
1107        Ok(proto::Ack {})
1108    }
1109}
1110
1111#[derive(Clone, Debug)]
1112enum WorktreeHandle {
1113    Strong(Model<Worktree>),
1114    Weak(WeakModel<Worktree>),
1115}
1116
1117impl WorktreeHandle {
1118    fn upgrade(&self) -> Option<Model<Worktree>> {
1119        match self {
1120            WorktreeHandle::Strong(handle) => Some(handle.clone()),
1121            WorktreeHandle::Weak(handle) => handle.upgrade(),
1122        }
1123    }
1124}