worktree_store.rs

   1use std::{
   2    path::{Path, PathBuf},
   3    sync::{atomic::AtomicUsize, Arc},
   4};
   5
   6use anyhow::{anyhow, Context as _, Result};
   7use collections::{HashMap, HashSet};
   8use fs::Fs;
   9use futures::{
  10    future::{BoxFuture, Shared},
  11    FutureExt, SinkExt,
  12};
  13use gpui::{
  14    AppContext, AsyncAppContext, EntityId, EventEmitter, Model, ModelContext, Task, WeakModel,
  15};
  16use postage::oneshot;
  17use rpc::{
  18    proto::{self, SSH_PROJECT_ID},
  19    AnyProtoClient, ErrorExt, TypedEnvelope,
  20};
  21use smol::{
  22    channel::{Receiver, Sender},
  23    stream::StreamExt,
  24};
  25use text::ReplicaId;
  26use util::ResultExt;
  27use worktree::{Entry, ProjectEntryId, Worktree, WorktreeId, WorktreeSettings};
  28
  29use crate::{search::SearchQuery, ProjectPath};
  30
  31struct MatchingEntry {
  32    worktree_path: Arc<Path>,
  33    path: ProjectPath,
  34    respond: oneshot::Sender<ProjectPath>,
  35}
  36
  37enum WorktreeStoreState {
  38    Local {
  39        fs: Arc<dyn Fs>,
  40    },
  41    Remote {
  42        upstream_client: AnyProtoClient,
  43        upstream_project_id: u64,
  44    },
  45}
  46
  47pub struct WorktreeStore {
  48    next_entry_id: Arc<AtomicUsize>,
  49    downstream_client: Option<(AnyProtoClient, u64)>,
  50    retain_worktrees: bool,
  51    worktrees: Vec<WorktreeHandle>,
  52    worktrees_reordered: bool,
  53    #[allow(clippy::type_complexity)]
  54    loading_worktrees:
  55        HashMap<Arc<Path>, Shared<Task<Result<Model<Worktree>, Arc<anyhow::Error>>>>>,
  56    state: WorktreeStoreState,
  57}
  58
  59pub enum WorktreeStoreEvent {
  60    WorktreeAdded(Model<Worktree>),
  61    WorktreeRemoved(EntityId, WorktreeId),
  62    WorktreeReleased(EntityId, WorktreeId),
  63    WorktreeOrderChanged,
  64    WorktreeUpdateSent(Model<Worktree>),
  65}
  66
  67impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
  68
  69impl WorktreeStore {
  70    pub fn init(client: &AnyProtoClient) {
  71        client.add_model_request_handler(Self::handle_create_project_entry);
  72        client.add_model_request_handler(Self::handle_rename_project_entry);
  73        client.add_model_request_handler(Self::handle_copy_project_entry);
  74        client.add_model_request_handler(Self::handle_delete_project_entry);
  75        client.add_model_request_handler(Self::handle_expand_project_entry);
  76        client.add_model_request_handler(Self::handle_git_branches);
  77        client.add_model_request_handler(Self::handle_update_branch);
  78    }
  79
  80    pub fn local(retain_worktrees: bool, fs: Arc<dyn Fs>) -> Self {
  81        Self {
  82            next_entry_id: Default::default(),
  83            loading_worktrees: Default::default(),
  84            downstream_client: None,
  85            worktrees: Vec::new(),
  86            worktrees_reordered: false,
  87            retain_worktrees,
  88            state: WorktreeStoreState::Local { fs },
  89        }
  90    }
  91
  92    pub fn remote(
  93        retain_worktrees: bool,
  94        upstream_client: AnyProtoClient,
  95        upstream_project_id: u64,
  96    ) -> Self {
  97        Self {
  98            next_entry_id: Default::default(),
  99            loading_worktrees: Default::default(),
 100            downstream_client: None,
 101            worktrees: Vec::new(),
 102            worktrees_reordered: false,
 103            retain_worktrees,
 104            state: WorktreeStoreState::Remote {
 105                upstream_client,
 106                upstream_project_id,
 107            },
 108        }
 109    }
 110
 111    /// Iterates through all worktrees, including ones that don't appear in the project panel
 112    pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Model<Worktree>> {
 113        self.worktrees
 114            .iter()
 115            .filter_map(move |worktree| worktree.upgrade())
 116    }
 117
 118    /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
 119    pub fn visible_worktrees<'a>(
 120        &'a self,
 121        cx: &'a AppContext,
 122    ) -> impl 'a + DoubleEndedIterator<Item = Model<Worktree>> {
 123        self.worktrees()
 124            .filter(|worktree| worktree.read(cx).is_visible())
 125    }
 126
 127    pub fn worktree_for_id(&self, id: WorktreeId, cx: &AppContext) -> Option<Model<Worktree>> {
 128        self.worktrees()
 129            .find(|worktree| worktree.read(cx).id() == id)
 130    }
 131
 132    pub fn current_branch(&self, repository: ProjectPath, cx: &AppContext) -> Option<Arc<str>> {
 133        self.worktree_for_id(repository.worktree_id, cx)?
 134            .read(cx)
 135            .git_entry(repository.path)?
 136            .branch()
 137    }
 138
 139    pub fn worktree_for_entry(
 140        &self,
 141        entry_id: ProjectEntryId,
 142        cx: &AppContext,
 143    ) -> Option<Model<Worktree>> {
 144        self.worktrees()
 145            .find(|worktree| worktree.read(cx).contains_entry(entry_id))
 146    }
 147
 148    pub fn find_worktree(
 149        &self,
 150        abs_path: &Path,
 151        cx: &AppContext,
 152    ) -> Option<(Model<Worktree>, PathBuf)> {
 153        for tree in self.worktrees() {
 154            if let Ok(relative_path) = abs_path.strip_prefix(tree.read(cx).abs_path()) {
 155                return Some((tree.clone(), relative_path.into()));
 156            }
 157        }
 158        None
 159    }
 160
 161    pub fn find_or_create_worktree(
 162        &mut self,
 163        abs_path: impl AsRef<Path>,
 164        visible: bool,
 165        cx: &mut ModelContext<Self>,
 166    ) -> Task<Result<(Model<Worktree>, PathBuf)>> {
 167        let abs_path = abs_path.as_ref();
 168        if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
 169            Task::ready(Ok((tree, relative_path)))
 170        } else {
 171            let worktree = self.create_worktree(abs_path, visible, cx);
 172            cx.background_executor()
 173                .spawn(async move { Ok((worktree.await?, PathBuf::new())) })
 174        }
 175    }
 176
 177    pub fn entry_for_id<'a>(
 178        &'a self,
 179        entry_id: ProjectEntryId,
 180        cx: &'a AppContext,
 181    ) -> Option<&'a Entry> {
 182        self.worktrees()
 183            .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
 184    }
 185
 186    pub fn entry_for_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Entry> {
 187        self.worktree_for_id(path.worktree_id, cx)?
 188            .read(cx)
 189            .entry_for_path(&path.path)
 190            .cloned()
 191    }
 192
 193    pub fn create_worktree(
 194        &mut self,
 195        abs_path: impl AsRef<Path>,
 196        visible: bool,
 197        cx: &mut ModelContext<Self>,
 198    ) -> Task<Result<Model<Worktree>>> {
 199        let path: Arc<Path> = abs_path.as_ref().into();
 200        if !self.loading_worktrees.contains_key(&path) {
 201            let task = match &self.state {
 202                WorktreeStoreState::Remote {
 203                    upstream_client, ..
 204                } => {
 205                    if upstream_client.is_via_collab() {
 206                        Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
 207                    } else {
 208                        self.create_ssh_worktree(upstream_client.clone(), abs_path, visible, cx)
 209                    }
 210                }
 211                WorktreeStoreState::Local { fs } => {
 212                    self.create_local_worktree(fs.clone(), abs_path, visible, cx)
 213                }
 214            };
 215
 216            self.loading_worktrees.insert(path.clone(), task.shared());
 217        }
 218        let task = self.loading_worktrees.get(&path).unwrap().clone();
 219        cx.spawn(|this, mut cx| async move {
 220            let result = task.await;
 221            this.update(&mut cx, |this, _| this.loading_worktrees.remove(&path))
 222                .ok();
 223            match result {
 224                Ok(worktree) => Ok(worktree),
 225                Err(err) => Err((*err).cloned()),
 226            }
 227        })
 228    }
 229
 230    fn create_ssh_worktree(
 231        &mut self,
 232        client: AnyProtoClient,
 233        abs_path: impl AsRef<Path>,
 234        visible: bool,
 235        cx: &mut ModelContext<Self>,
 236    ) -> Task<Result<Model<Worktree>, Arc<anyhow::Error>>> {
 237        let path_key: Arc<Path> = abs_path.as_ref().into();
 238        let mut abs_path = path_key.clone().to_string_lossy().to_string();
 239        // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
 240        // in which case want to strip the leading the `/`.
 241        // On the host-side, the `~` will get expanded.
 242        // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
 243        if abs_path.starts_with("/~") {
 244            abs_path = abs_path[1..].to_string();
 245        }
 246        if abs_path.is_empty() || abs_path == "/" {
 247            abs_path = "~/".to_string();
 248        }
 249        cx.spawn(|this, mut cx| async move {
 250            let this = this.upgrade().context("Dropped worktree store")?;
 251
 252            let response = client
 253                .request(proto::AddWorktree {
 254                    project_id: SSH_PROJECT_ID,
 255                    path: abs_path.clone(),
 256                    visible,
 257                })
 258                .await?;
 259
 260            if let Some(existing_worktree) = this.read_with(&cx, |this, cx| {
 261                this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
 262            })? {
 263                return Ok(existing_worktree);
 264            }
 265
 266            let root_name = PathBuf::from(&response.canonicalized_path)
 267                .file_name()
 268                .map(|n| n.to_string_lossy().to_string())
 269                .unwrap_or(response.canonicalized_path.to_string());
 270
 271            let worktree = cx.update(|cx| {
 272                Worktree::remote(
 273                    SSH_PROJECT_ID,
 274                    0,
 275                    proto::WorktreeMetadata {
 276                        id: response.worktree_id,
 277                        root_name,
 278                        visible,
 279                        abs_path: response.canonicalized_path,
 280                    },
 281                    client,
 282                    cx,
 283                )
 284            })?;
 285
 286            this.update(&mut cx, |this, cx| {
 287                this.add(&worktree, cx);
 288            })?;
 289            Ok(worktree)
 290        })
 291    }
 292
 293    fn create_local_worktree(
 294        &mut self,
 295        fs: Arc<dyn Fs>,
 296        abs_path: impl AsRef<Path>,
 297        visible: bool,
 298        cx: &mut ModelContext<Self>,
 299    ) -> Task<Result<Model<Worktree>, Arc<anyhow::Error>>> {
 300        let next_entry_id = self.next_entry_id.clone();
 301        let path: Arc<Path> = abs_path.as_ref().into();
 302
 303        cx.spawn(move |this, mut cx| async move {
 304            let worktree = Worktree::local(path.clone(), visible, fs, next_entry_id, &mut cx).await;
 305
 306            let worktree = worktree?;
 307            this.update(&mut cx, |this, cx| this.add(&worktree, cx))?;
 308
 309            if visible {
 310                cx.update(|cx| {
 311                    cx.add_recent_document(&path);
 312                })
 313                .log_err();
 314            }
 315
 316            Ok(worktree)
 317        })
 318    }
 319
 320    pub fn add(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
 321        let worktree_id = worktree.read(cx).id();
 322        debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
 323
 324        let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
 325        let handle = if push_strong_handle {
 326            WorktreeHandle::Strong(worktree.clone())
 327        } else {
 328            WorktreeHandle::Weak(worktree.downgrade())
 329        };
 330        if self.worktrees_reordered {
 331            self.worktrees.push(handle);
 332        } else {
 333            let i = match self
 334                .worktrees
 335                .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
 336                    other.upgrade().map(|worktree| worktree.read(cx).abs_path())
 337                }) {
 338                Ok(i) | Err(i) => i,
 339            };
 340            self.worktrees.insert(i, handle);
 341        }
 342
 343        cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
 344        self.send_project_updates(cx);
 345
 346        let handle_id = worktree.entity_id();
 347        cx.observe_release(worktree, move |this, worktree, cx| {
 348            cx.emit(WorktreeStoreEvent::WorktreeReleased(
 349                handle_id,
 350                worktree.id(),
 351            ));
 352            cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 353                handle_id,
 354                worktree.id(),
 355            ));
 356            this.send_project_updates(cx);
 357        })
 358        .detach();
 359    }
 360
 361    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut ModelContext<Self>) {
 362        self.worktrees.retain(|worktree| {
 363            if let Some(worktree) = worktree.upgrade() {
 364                if worktree.read(cx).id() == id_to_remove {
 365                    cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 366                        worktree.entity_id(),
 367                        id_to_remove,
 368                    ));
 369                    false
 370                } else {
 371                    true
 372                }
 373            } else {
 374                false
 375            }
 376        });
 377        self.send_project_updates(cx);
 378    }
 379
 380    pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
 381        self.worktrees_reordered = worktrees_reordered;
 382    }
 383
 384    fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
 385        match &self.state {
 386            WorktreeStoreState::Remote {
 387                upstream_client,
 388                upstream_project_id,
 389                ..
 390            } => Some((upstream_client.clone(), *upstream_project_id)),
 391            WorktreeStoreState::Local { .. } => None,
 392        }
 393    }
 394
 395    pub fn set_worktrees_from_proto(
 396        &mut self,
 397        worktrees: Vec<proto::WorktreeMetadata>,
 398        replica_id: ReplicaId,
 399        cx: &mut ModelContext<Self>,
 400    ) -> Result<()> {
 401        let mut old_worktrees_by_id = self
 402            .worktrees
 403            .drain(..)
 404            .filter_map(|worktree| {
 405                let worktree = worktree.upgrade()?;
 406                Some((worktree.read(cx).id(), worktree))
 407            })
 408            .collect::<HashMap<_, _>>();
 409
 410        let (client, project_id) = self
 411            .upstream_client()
 412            .clone()
 413            .ok_or_else(|| anyhow!("invalid project"))?;
 414
 415        for worktree in worktrees {
 416            if let Some(old_worktree) =
 417                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
 418            {
 419                let push_strong_handle =
 420                    self.retain_worktrees || old_worktree.read(cx).is_visible();
 421                let handle = if push_strong_handle {
 422                    WorktreeHandle::Strong(old_worktree.clone())
 423                } else {
 424                    WorktreeHandle::Weak(old_worktree.downgrade())
 425                };
 426                self.worktrees.push(handle);
 427            } else {
 428                self.add(
 429                    &Worktree::remote(project_id, replica_id, worktree, client.clone(), cx),
 430                    cx,
 431                );
 432            }
 433        }
 434        self.send_project_updates(cx);
 435
 436        Ok(())
 437    }
 438
 439    pub fn move_worktree(
 440        &mut self,
 441        source: WorktreeId,
 442        destination: WorktreeId,
 443        cx: &mut ModelContext<Self>,
 444    ) -> Result<()> {
 445        if source == destination {
 446            return Ok(());
 447        }
 448
 449        let mut source_index = None;
 450        let mut destination_index = None;
 451        for (i, worktree) in self.worktrees.iter().enumerate() {
 452            if let Some(worktree) = worktree.upgrade() {
 453                let worktree_id = worktree.read(cx).id();
 454                if worktree_id == source {
 455                    source_index = Some(i);
 456                    if destination_index.is_some() {
 457                        break;
 458                    }
 459                } else if worktree_id == destination {
 460                    destination_index = Some(i);
 461                    if source_index.is_some() {
 462                        break;
 463                    }
 464                }
 465            }
 466        }
 467
 468        let source_index =
 469            source_index.with_context(|| format!("Missing worktree for id {source}"))?;
 470        let destination_index =
 471            destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
 472
 473        if source_index == destination_index {
 474            return Ok(());
 475        }
 476
 477        let worktree_to_move = self.worktrees.remove(source_index);
 478        self.worktrees.insert(destination_index, worktree_to_move);
 479        self.worktrees_reordered = true;
 480        cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
 481        cx.notify();
 482        Ok(())
 483    }
 484
 485    pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
 486        for worktree in &self.worktrees {
 487            if let Some(worktree) = worktree.upgrade() {
 488                worktree.update(cx, |worktree, _| {
 489                    if let Some(worktree) = worktree.as_remote_mut() {
 490                        worktree.disconnected_from_host();
 491                    }
 492                });
 493            }
 494        }
 495    }
 496
 497    pub fn send_project_updates(&mut self, cx: &mut ModelContext<Self>) {
 498        let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
 499            return;
 500        };
 501
 502        let update = proto::UpdateProject {
 503            project_id,
 504            worktrees: self.worktree_metadata_protos(cx),
 505        };
 506
 507        // collab has bad concurrency guarantees, so we send requests in serial.
 508        let update_project = if downstream_client.is_via_collab() {
 509            Some(downstream_client.request(update))
 510        } else {
 511            downstream_client.send(update).log_err();
 512            None
 513        };
 514        cx.spawn(|this, mut cx| async move {
 515            if let Some(update_project) = update_project {
 516                update_project.await?;
 517            }
 518
 519            this.update(&mut cx, |this, cx| {
 520                let worktrees = this.worktrees().collect::<Vec<_>>();
 521
 522                for worktree in worktrees {
 523                    worktree.update(cx, |worktree, cx| {
 524                        let client = downstream_client.clone();
 525                        worktree.observe_updates(project_id, cx, {
 526                            move |update| {
 527                                let client = client.clone();
 528                                async move {
 529                                    if client.is_via_collab() {
 530                                        client
 531                                            .request(update)
 532                                            .map(|result| result.log_err().is_some())
 533                                            .await
 534                                    } else {
 535                                        client.send(update).log_err().is_some()
 536                                    }
 537                                }
 538                            }
 539                        });
 540                    });
 541
 542                    cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
 543                }
 544
 545                anyhow::Ok(())
 546            })
 547        })
 548        .detach_and_log_err(cx);
 549    }
 550
 551    pub fn worktree_metadata_protos(&self, cx: &AppContext) -> Vec<proto::WorktreeMetadata> {
 552        self.worktrees()
 553            .map(|worktree| {
 554                let worktree = worktree.read(cx);
 555                proto::WorktreeMetadata {
 556                    id: worktree.id().to_proto(),
 557                    root_name: worktree.root_name().into(),
 558                    visible: worktree.is_visible(),
 559                    abs_path: worktree.abs_path().to_string_lossy().into(),
 560                }
 561            })
 562            .collect()
 563    }
 564
 565    pub fn shared(
 566        &mut self,
 567        remote_id: u64,
 568        downsteam_client: AnyProtoClient,
 569        cx: &mut ModelContext<Self>,
 570    ) {
 571        self.retain_worktrees = true;
 572        self.downstream_client = Some((downsteam_client, remote_id));
 573
 574        // When shared, retain all worktrees
 575        for worktree_handle in self.worktrees.iter_mut() {
 576            match worktree_handle {
 577                WorktreeHandle::Strong(_) => {}
 578                WorktreeHandle::Weak(worktree) => {
 579                    if let Some(worktree) = worktree.upgrade() {
 580                        *worktree_handle = WorktreeHandle::Strong(worktree);
 581                    }
 582                }
 583            }
 584        }
 585        self.send_project_updates(cx);
 586    }
 587
 588    pub fn unshared(&mut self, cx: &mut ModelContext<Self>) {
 589        self.retain_worktrees = false;
 590        self.downstream_client.take();
 591
 592        // When not shared, only retain the visible worktrees
 593        for worktree_handle in self.worktrees.iter_mut() {
 594            if let WorktreeHandle::Strong(worktree) = worktree_handle {
 595                let is_visible = worktree.update(cx, |worktree, _| {
 596                    worktree.stop_observing_updates();
 597                    worktree.is_visible()
 598                });
 599                if !is_visible {
 600                    *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
 601                }
 602            }
 603        }
 604    }
 605
 606    /// search over all worktrees and return buffers that *might* match the search.
 607    pub fn find_search_candidates(
 608        &self,
 609        query: SearchQuery,
 610        limit: usize,
 611        open_entries: HashSet<ProjectEntryId>,
 612        fs: Arc<dyn Fs>,
 613        cx: &ModelContext<Self>,
 614    ) -> Receiver<ProjectPath> {
 615        let snapshots = self
 616            .visible_worktrees(cx)
 617            .filter_map(|tree| {
 618                let tree = tree.read(cx);
 619                Some((tree.snapshot(), tree.as_local()?.settings()))
 620            })
 621            .collect::<Vec<_>>();
 622
 623        let executor = cx.background_executor().clone();
 624
 625        // We want to return entries in the order they are in the worktrees, so we have one
 626        // thread that iterates over the worktrees (and ignored directories) as necessary,
 627        // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
 628        // channel.
 629        // We spawn a number of workers that take items from the filter channel and check the query
 630        // against the version of the file on disk.
 631        let (filter_tx, filter_rx) = smol::channel::bounded(64);
 632        let (output_tx, mut output_rx) = smol::channel::bounded(64);
 633        let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
 634
 635        let input = cx.background_executor().spawn({
 636            let fs = fs.clone();
 637            let query = query.clone();
 638            async move {
 639                Self::find_candidate_paths(
 640                    fs,
 641                    snapshots,
 642                    open_entries,
 643                    query,
 644                    filter_tx,
 645                    output_tx,
 646                )
 647                .await
 648                .log_err();
 649            }
 650        });
 651        const MAX_CONCURRENT_FILE_SCANS: usize = 64;
 652        let filters = cx.background_executor().spawn(async move {
 653            let fs = &fs;
 654            let query = &query;
 655            executor
 656                .scoped(move |scope| {
 657                    for _ in 0..MAX_CONCURRENT_FILE_SCANS {
 658                        let filter_rx = filter_rx.clone();
 659                        scope.spawn(async move {
 660                            Self::filter_paths(fs, filter_rx, query).await.log_err();
 661                        })
 662                    }
 663                })
 664                .await;
 665        });
 666        cx.background_executor()
 667            .spawn(async move {
 668                let mut matched = 0;
 669                while let Some(mut receiver) = output_rx.next().await {
 670                    let Some(path) = receiver.next().await else {
 671                        continue;
 672                    };
 673                    let Ok(_) = matching_paths_tx.send(path).await else {
 674                        break;
 675                    };
 676                    matched += 1;
 677                    if matched == limit {
 678                        break;
 679                    }
 680                }
 681                drop(input);
 682                drop(filters);
 683            })
 684            .detach();
 685        matching_paths_rx
 686    }
 687
 688    fn scan_ignored_dir<'a>(
 689        fs: &'a Arc<dyn Fs>,
 690        snapshot: &'a worktree::Snapshot,
 691        path: &'a Path,
 692        query: &'a SearchQuery,
 693        include_root: bool,
 694        filter_tx: &'a Sender<MatchingEntry>,
 695        output_tx: &'a Sender<oneshot::Receiver<ProjectPath>>,
 696    ) -> BoxFuture<'a, Result<()>> {
 697        async move {
 698            let abs_path = snapshot.abs_path().join(path);
 699            let Some(mut files) = fs
 700                .read_dir(&abs_path)
 701                .await
 702                .with_context(|| format!("listing ignored path {abs_path:?}"))
 703                .log_err()
 704            else {
 705                return Ok(());
 706            };
 707
 708            let mut results = Vec::new();
 709
 710            while let Some(Ok(file)) = files.next().await {
 711                let Some(metadata) = fs
 712                    .metadata(&file)
 713                    .await
 714                    .with_context(|| format!("fetching fs metadata for {abs_path:?}"))
 715                    .log_err()
 716                    .flatten()
 717                else {
 718                    continue;
 719                };
 720                if metadata.is_symlink || metadata.is_fifo {
 721                    continue;
 722                }
 723                results.push((
 724                    file.strip_prefix(snapshot.abs_path())?.to_path_buf(),
 725                    !metadata.is_dir,
 726                ))
 727            }
 728            results.sort_by(|(a_path, _), (b_path, _)| a_path.cmp(b_path));
 729            for (path, is_file) in results {
 730                if is_file {
 731                    if query.filters_path() {
 732                        let matched_path = if include_root {
 733                            let mut full_path = PathBuf::from(snapshot.root_name());
 734                            full_path.push(&path);
 735                            query.file_matches(&full_path)
 736                        } else {
 737                            query.file_matches(&path)
 738                        };
 739                        if !matched_path {
 740                            continue;
 741                        }
 742                    }
 743                    let (tx, rx) = oneshot::channel();
 744                    output_tx.send(rx).await?;
 745                    filter_tx
 746                        .send(MatchingEntry {
 747                            respond: tx,
 748                            worktree_path: snapshot.abs_path().clone(),
 749                            path: ProjectPath {
 750                                worktree_id: snapshot.id(),
 751                                path: Arc::from(path),
 752                            },
 753                        })
 754                        .await?;
 755                } else {
 756                    Self::scan_ignored_dir(
 757                        fs,
 758                        snapshot,
 759                        &path,
 760                        query,
 761                        include_root,
 762                        filter_tx,
 763                        output_tx,
 764                    )
 765                    .await?;
 766                }
 767            }
 768            Ok(())
 769        }
 770        .boxed()
 771    }
 772
 773    async fn find_candidate_paths(
 774        fs: Arc<dyn Fs>,
 775        snapshots: Vec<(worktree::Snapshot, WorktreeSettings)>,
 776        open_entries: HashSet<ProjectEntryId>,
 777        query: SearchQuery,
 778        filter_tx: Sender<MatchingEntry>,
 779        output_tx: Sender<oneshot::Receiver<ProjectPath>>,
 780    ) -> Result<()> {
 781        let include_root = snapshots.len() > 1;
 782        for (snapshot, settings) in snapshots {
 783            for entry in snapshot.entries(query.include_ignored(), 0) {
 784                if entry.is_dir() && entry.is_ignored {
 785                    if !settings.is_path_excluded(&entry.path) {
 786                        Self::scan_ignored_dir(
 787                            &fs,
 788                            &snapshot,
 789                            &entry.path,
 790                            &query,
 791                            include_root,
 792                            &filter_tx,
 793                            &output_tx,
 794                        )
 795                        .await?;
 796                    }
 797                    continue;
 798                }
 799
 800                if entry.is_fifo || !entry.is_file() {
 801                    continue;
 802                }
 803
 804                if query.filters_path() {
 805                    let matched_path = if include_root {
 806                        let mut full_path = PathBuf::from(snapshot.root_name());
 807                        full_path.push(&entry.path);
 808                        query.file_matches(&full_path)
 809                    } else {
 810                        query.file_matches(&entry.path)
 811                    };
 812                    if !matched_path {
 813                        continue;
 814                    }
 815                }
 816
 817                let (mut tx, rx) = oneshot::channel();
 818
 819                if open_entries.contains(&entry.id) {
 820                    tx.send(ProjectPath {
 821                        worktree_id: snapshot.id(),
 822                        path: entry.path.clone(),
 823                    })
 824                    .await?;
 825                } else {
 826                    filter_tx
 827                        .send(MatchingEntry {
 828                            respond: tx,
 829                            worktree_path: snapshot.abs_path().clone(),
 830                            path: ProjectPath {
 831                                worktree_id: snapshot.id(),
 832                                path: entry.path.clone(),
 833                            },
 834                        })
 835                        .await?;
 836                }
 837
 838                output_tx.send(rx).await?;
 839            }
 840        }
 841        Ok(())
 842    }
 843
 844    pub fn branches(
 845        &self,
 846        project_path: ProjectPath,
 847        cx: &AppContext,
 848    ) -> Task<Result<Vec<git::repository::Branch>>> {
 849        let Some(worktree) = self.worktree_for_id(project_path.worktree_id, cx) else {
 850            return Task::ready(Err(anyhow!("No worktree found for ProjectPath")));
 851        };
 852
 853        match worktree.read(cx) {
 854            Worktree::Local(local_worktree) => {
 855                let branches = util::maybe!({
 856                    let worktree_error = |error| {
 857                        format!(
 858                            "{} for worktree {}",
 859                            error,
 860                            local_worktree.abs_path().to_string_lossy()
 861                        )
 862                    };
 863
 864                    let entry = local_worktree
 865                        .git_entry(project_path.path)
 866                        .with_context(|| worktree_error("No git entry found"))?;
 867
 868                    let repo = local_worktree
 869                        .get_local_repo(&entry)
 870                        .with_context(|| worktree_error("No repository found"))?
 871                        .repo()
 872                        .clone();
 873
 874                    repo.branches()
 875                });
 876
 877                Task::ready(branches)
 878            }
 879            Worktree::Remote(remote_worktree) => {
 880                let request = remote_worktree.client().request(proto::GitBranches {
 881                    project_id: remote_worktree.project_id(),
 882                    repository: Some(proto::ProjectPath {
 883                        worktree_id: project_path.worktree_id.to_proto(),
 884                        path: project_path.path.to_string_lossy().to_string(), // Root path
 885                    }),
 886                });
 887
 888                cx.background_executor().spawn(async move {
 889                    let response = request.await?;
 890
 891                    let branches = response
 892                        .branches
 893                        .into_iter()
 894                        .map(|proto_branch| git::repository::Branch {
 895                            is_head: proto_branch.is_head,
 896                            name: proto_branch.name.into(),
 897                            unix_timestamp: proto_branch
 898                                .unix_timestamp
 899                                .map(|timestamp| timestamp as i64),
 900                        })
 901                        .collect();
 902
 903                    Ok(branches)
 904                })
 905            }
 906        }
 907    }
 908
 909    pub fn update_or_create_branch(
 910        &self,
 911        repository: ProjectPath,
 912        new_branch: String,
 913        cx: &AppContext,
 914    ) -> Task<Result<()>> {
 915        let Some(worktree) = self.worktree_for_id(repository.worktree_id, cx) else {
 916            return Task::ready(Err(anyhow!("No worktree found for ProjectPath")));
 917        };
 918
 919        match worktree.read(cx) {
 920            Worktree::Local(local_worktree) => {
 921                let result = util::maybe!({
 922                    let worktree_error = |error| {
 923                        format!(
 924                            "{} for worktree {}",
 925                            error,
 926                            local_worktree.abs_path().to_string_lossy()
 927                        )
 928                    };
 929
 930                    let entry = local_worktree
 931                        .git_entry(repository.path)
 932                        .with_context(|| worktree_error("No git entry found"))?;
 933
 934                    let repo = local_worktree
 935                        .get_local_repo(&entry)
 936                        .with_context(|| worktree_error("No repository found"))?
 937                        .repo()
 938                        .clone();
 939
 940                    if !repo.branch_exits(&new_branch)? {
 941                        repo.create_branch(&new_branch)?;
 942                    }
 943
 944                    repo.change_branch(&new_branch)?;
 945
 946                    Ok(())
 947                });
 948
 949                Task::ready(result)
 950            }
 951            Worktree::Remote(remote_worktree) => {
 952                let request = remote_worktree.client().request(proto::UpdateGitBranch {
 953                    project_id: remote_worktree.project_id(),
 954                    repository: Some(proto::ProjectPath {
 955                        worktree_id: repository.worktree_id.to_proto(),
 956                        path: repository.path.to_string_lossy().to_string(), // Root path
 957                    }),
 958                    branch_name: new_branch,
 959                });
 960
 961                cx.background_executor().spawn(async move {
 962                    request.await?;
 963                    Ok(())
 964                })
 965            }
 966        }
 967    }
 968
 969    async fn filter_paths(
 970        fs: &Arc<dyn Fs>,
 971        mut input: Receiver<MatchingEntry>,
 972        query: &SearchQuery,
 973    ) -> Result<()> {
 974        while let Some(mut entry) = input.next().await {
 975            let abs_path = entry.worktree_path.join(&entry.path.path);
 976            let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
 977                continue;
 978            };
 979            if query.detect(file).unwrap_or(false) {
 980                entry.respond.send(entry.path).await?
 981            }
 982        }
 983
 984        Ok(())
 985    }
 986
 987    pub async fn handle_create_project_entry(
 988        this: Model<Self>,
 989        envelope: TypedEnvelope<proto::CreateProjectEntry>,
 990        mut cx: AsyncAppContext,
 991    ) -> Result<proto::ProjectEntryResponse> {
 992        let worktree = this.update(&mut cx, |this, cx| {
 993            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
 994            this.worktree_for_id(worktree_id, cx)
 995                .ok_or_else(|| anyhow!("worktree not found"))
 996        })??;
 997        Worktree::handle_create_entry(worktree, envelope.payload, cx).await
 998    }
 999
1000    pub async fn handle_rename_project_entry(
1001        this: Model<Self>,
1002        envelope: TypedEnvelope<proto::RenameProjectEntry>,
1003        mut cx: AsyncAppContext,
1004    ) -> Result<proto::ProjectEntryResponse> {
1005        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1006        let worktree = this.update(&mut cx, |this, cx| {
1007            this.worktree_for_entry(entry_id, cx)
1008                .ok_or_else(|| anyhow!("worktree not found"))
1009        })??;
1010        Worktree::handle_rename_entry(worktree, envelope.payload, cx).await
1011    }
1012
1013    pub async fn handle_copy_project_entry(
1014        this: Model<Self>,
1015        envelope: TypedEnvelope<proto::CopyProjectEntry>,
1016        mut cx: AsyncAppContext,
1017    ) -> Result<proto::ProjectEntryResponse> {
1018        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1019        let worktree = this.update(&mut cx, |this, cx| {
1020            this.worktree_for_entry(entry_id, cx)
1021                .ok_or_else(|| anyhow!("worktree not found"))
1022        })??;
1023        Worktree::handle_copy_entry(worktree, envelope.payload, cx).await
1024    }
1025
1026    pub async fn handle_delete_project_entry(
1027        this: Model<Self>,
1028        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
1029        mut cx: AsyncAppContext,
1030    ) -> Result<proto::ProjectEntryResponse> {
1031        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1032        let worktree = this.update(&mut cx, |this, cx| {
1033            this.worktree_for_entry(entry_id, cx)
1034                .ok_or_else(|| anyhow!("worktree not found"))
1035        })??;
1036        Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
1037    }
1038
1039    pub async fn handle_expand_project_entry(
1040        this: Model<Self>,
1041        envelope: TypedEnvelope<proto::ExpandProjectEntry>,
1042        mut cx: AsyncAppContext,
1043    ) -> Result<proto::ExpandProjectEntryResponse> {
1044        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1045        let worktree = this
1046            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1047            .ok_or_else(|| anyhow!("invalid request"))?;
1048        Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
1049    }
1050
1051    pub async fn handle_git_branches(
1052        this: Model<Self>,
1053        branches: TypedEnvelope<proto::GitBranches>,
1054        cx: AsyncAppContext,
1055    ) -> Result<proto::GitBranchesResponse> {
1056        let project_path = branches
1057            .payload
1058            .repository
1059            .clone()
1060            .context("Invalid GitBranches call")?;
1061        let project_path = ProjectPath {
1062            worktree_id: WorktreeId::from_proto(project_path.worktree_id),
1063            path: Path::new(&project_path.path).into(),
1064        };
1065
1066        let branches = this
1067            .read_with(&cx, |this, cx| this.branches(project_path, cx))?
1068            .await?;
1069
1070        Ok(proto::GitBranchesResponse {
1071            branches: branches
1072                .into_iter()
1073                .map(|branch| proto::Branch {
1074                    is_head: branch.is_head,
1075                    name: branch.name.to_string(),
1076                    unix_timestamp: branch.unix_timestamp.map(|timestamp| timestamp as u64),
1077                })
1078                .collect(),
1079        })
1080    }
1081
1082    pub async fn handle_update_branch(
1083        this: Model<Self>,
1084        update_branch: TypedEnvelope<proto::UpdateGitBranch>,
1085        cx: AsyncAppContext,
1086    ) -> Result<proto::Ack> {
1087        let project_path = update_branch
1088            .payload
1089            .repository
1090            .clone()
1091            .context("Invalid GitBranches call")?;
1092        let project_path = ProjectPath {
1093            worktree_id: WorktreeId::from_proto(project_path.worktree_id),
1094            path: Path::new(&project_path.path).into(),
1095        };
1096        let new_branch = update_branch.payload.branch_name;
1097
1098        this.read_with(&cx, |this, cx| {
1099            this.update_or_create_branch(project_path, new_branch, cx)
1100        })?
1101        .await?;
1102
1103        Ok(proto::Ack {})
1104    }
1105}
1106
1107#[derive(Clone, Debug)]
1108enum WorktreeHandle {
1109    Strong(Model<Worktree>),
1110    Weak(WeakModel<Worktree>),
1111}
1112
1113impl WorktreeHandle {
1114    fn upgrade(&self) -> Option<Model<Worktree>> {
1115        match self {
1116            WorktreeHandle::Strong(handle) => Some(handle.clone()),
1117            WorktreeHandle::Weak(handle) => handle.upgrade(),
1118        }
1119    }
1120}