worktree_store.rs

   1use std::{
   2    path::{Path, PathBuf},
   3    sync::{atomic::AtomicUsize, Arc},
   4};
   5
   6use anyhow::{anyhow, Context as _, Result};
   7use collections::{HashMap, HashSet};
   8use fs::Fs;
   9use futures::{
  10    future::{BoxFuture, Shared},
  11    FutureExt, SinkExt,
  12};
  13use gpui::{
  14    AppContext, AsyncAppContext, EntityId, EventEmitter, Model, ModelContext, Task, WeakModel,
  15};
  16use postage::oneshot;
  17use rpc::{
  18    proto::{self, SSH_PROJECT_ID},
  19    AnyProtoClient, ErrorExt, TypedEnvelope,
  20};
  21use smol::{
  22    channel::{Receiver, Sender},
  23    stream::StreamExt,
  24};
  25use text::ReplicaId;
  26use util::{paths::SanitizedPath, ResultExt};
  27use worktree::{Entry, ProjectEntryId, Worktree, WorktreeId, WorktreeSettings};
  28
  29use crate::{search::SearchQuery, ProjectPath};
  30
  31struct MatchingEntry {
  32    worktree_path: Arc<Path>,
  33    path: ProjectPath,
  34    respond: oneshot::Sender<ProjectPath>,
  35}
  36
  37enum WorktreeStoreState {
  38    Local {
  39        fs: Arc<dyn Fs>,
  40    },
  41    Remote {
  42        upstream_client: AnyProtoClient,
  43        upstream_project_id: u64,
  44    },
  45}
  46
  47pub struct WorktreeStore {
  48    next_entry_id: Arc<AtomicUsize>,
  49    downstream_client: Option<(AnyProtoClient, u64)>,
  50    retain_worktrees: bool,
  51    worktrees: Vec<WorktreeHandle>,
  52    worktrees_reordered: bool,
  53    #[allow(clippy::type_complexity)]
  54    loading_worktrees:
  55        HashMap<SanitizedPath, Shared<Task<Result<Model<Worktree>, Arc<anyhow::Error>>>>>,
  56    state: WorktreeStoreState,
  57}
  58
  59pub enum WorktreeStoreEvent {
  60    WorktreeAdded(Model<Worktree>),
  61    WorktreeRemoved(EntityId, WorktreeId),
  62    WorktreeReleased(EntityId, WorktreeId),
  63    WorktreeOrderChanged,
  64    WorktreeUpdateSent(Model<Worktree>),
  65}
  66
  67impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
  68
  69impl WorktreeStore {
  70    pub fn init(client: &AnyProtoClient) {
  71        client.add_model_request_handler(Self::handle_create_project_entry);
  72        client.add_model_request_handler(Self::handle_rename_project_entry);
  73        client.add_model_request_handler(Self::handle_copy_project_entry);
  74        client.add_model_request_handler(Self::handle_delete_project_entry);
  75        client.add_model_request_handler(Self::handle_expand_project_entry);
  76        client.add_model_request_handler(Self::handle_git_branches);
  77        client.add_model_request_handler(Self::handle_update_branch);
  78    }
  79
  80    pub fn local(retain_worktrees: bool, fs: Arc<dyn Fs>) -> Self {
  81        Self {
  82            next_entry_id: Default::default(),
  83            loading_worktrees: Default::default(),
  84            downstream_client: None,
  85            worktrees: Vec::new(),
  86            worktrees_reordered: false,
  87            retain_worktrees,
  88            state: WorktreeStoreState::Local { fs },
  89        }
  90    }
  91
  92    pub fn remote(
  93        retain_worktrees: bool,
  94        upstream_client: AnyProtoClient,
  95        upstream_project_id: u64,
  96    ) -> Self {
  97        Self {
  98            next_entry_id: Default::default(),
  99            loading_worktrees: Default::default(),
 100            downstream_client: None,
 101            worktrees: Vec::new(),
 102            worktrees_reordered: false,
 103            retain_worktrees,
 104            state: WorktreeStoreState::Remote {
 105                upstream_client,
 106                upstream_project_id,
 107            },
 108        }
 109    }
 110
 111    /// Iterates through all worktrees, including ones that don't appear in the project panel
 112    pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Model<Worktree>> {
 113        self.worktrees
 114            .iter()
 115            .filter_map(move |worktree| worktree.upgrade())
 116    }
 117
 118    /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
 119    pub fn visible_worktrees<'a>(
 120        &'a self,
 121        cx: &'a AppContext,
 122    ) -> impl 'a + DoubleEndedIterator<Item = Model<Worktree>> {
 123        self.worktrees()
 124            .filter(|worktree| worktree.read(cx).is_visible())
 125    }
 126
 127    pub fn worktree_for_id(&self, id: WorktreeId, cx: &AppContext) -> Option<Model<Worktree>> {
 128        self.worktrees()
 129            .find(|worktree| worktree.read(cx).id() == id)
 130    }
 131
 132    pub fn current_branch(&self, repository: ProjectPath, cx: &AppContext) -> Option<Arc<str>> {
 133        self.worktree_for_id(repository.worktree_id, cx)?
 134            .read(cx)
 135            .git_entry(repository.path)?
 136            .branch()
 137    }
 138
 139    pub fn worktree_for_entry(
 140        &self,
 141        entry_id: ProjectEntryId,
 142        cx: &AppContext,
 143    ) -> Option<Model<Worktree>> {
 144        self.worktrees()
 145            .find(|worktree| worktree.read(cx).contains_entry(entry_id))
 146    }
 147
 148    pub fn find_worktree(
 149        &self,
 150        abs_path: impl Into<SanitizedPath>,
 151        cx: &AppContext,
 152    ) -> Option<(Model<Worktree>, PathBuf)> {
 153        let abs_path: SanitizedPath = abs_path.into();
 154        for tree in self.worktrees() {
 155            if let Ok(relative_path) = abs_path.as_path().strip_prefix(tree.read(cx).abs_path()) {
 156                return Some((tree.clone(), relative_path.into()));
 157            }
 158        }
 159        None
 160    }
 161
 162    pub fn find_or_create_worktree(
 163        &mut self,
 164        abs_path: impl AsRef<Path>,
 165        visible: bool,
 166        cx: &mut ModelContext<Self>,
 167    ) -> Task<Result<(Model<Worktree>, PathBuf)>> {
 168        let abs_path = abs_path.as_ref();
 169        if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
 170            Task::ready(Ok((tree, relative_path)))
 171        } else {
 172            let worktree = self.create_worktree(abs_path, visible, cx);
 173            cx.background_executor()
 174                .spawn(async move { Ok((worktree.await?, PathBuf::new())) })
 175        }
 176    }
 177
 178    pub fn entry_for_id<'a>(
 179        &'a self,
 180        entry_id: ProjectEntryId,
 181        cx: &'a AppContext,
 182    ) -> Option<&'a Entry> {
 183        self.worktrees()
 184            .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
 185    }
 186
 187    pub fn entry_for_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Entry> {
 188        self.worktree_for_id(path.worktree_id, cx)?
 189            .read(cx)
 190            .entry_for_path(&path.path)
 191            .cloned()
 192    }
 193
 194    pub fn create_worktree(
 195        &mut self,
 196        abs_path: impl Into<SanitizedPath>,
 197        visible: bool,
 198        cx: &mut ModelContext<Self>,
 199    ) -> Task<Result<Model<Worktree>>> {
 200        let abs_path: SanitizedPath = abs_path.into();
 201        if !self.loading_worktrees.contains_key(&abs_path) {
 202            let task = match &self.state {
 203                WorktreeStoreState::Remote {
 204                    upstream_client, ..
 205                } => {
 206                    if upstream_client.is_via_collab() {
 207                        Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
 208                    } else {
 209                        self.create_ssh_worktree(
 210                            upstream_client.clone(),
 211                            abs_path.clone(),
 212                            visible,
 213                            cx,
 214                        )
 215                    }
 216                }
 217                WorktreeStoreState::Local { fs } => {
 218                    self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
 219                }
 220            };
 221
 222            self.loading_worktrees
 223                .insert(abs_path.clone(), task.shared());
 224        }
 225        let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
 226        cx.spawn(|this, mut cx| async move {
 227            let result = task.await;
 228            this.update(&mut cx, |this, _| this.loading_worktrees.remove(&abs_path))
 229                .ok();
 230            match result {
 231                Ok(worktree) => Ok(worktree),
 232                Err(err) => Err((*err).cloned()),
 233            }
 234        })
 235    }
 236
 237    fn create_ssh_worktree(
 238        &mut self,
 239        client: AnyProtoClient,
 240        abs_path: impl Into<SanitizedPath>,
 241        visible: bool,
 242        cx: &mut ModelContext<Self>,
 243    ) -> Task<Result<Model<Worktree>, Arc<anyhow::Error>>> {
 244        let mut abs_path = Into::<SanitizedPath>::into(abs_path).to_string();
 245        // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
 246        // in which case want to strip the leading the `/`.
 247        // On the host-side, the `~` will get expanded.
 248        // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
 249        if abs_path.starts_with("/~") {
 250            abs_path = abs_path[1..].to_string();
 251        }
 252        if abs_path.is_empty() || abs_path == "/" {
 253            abs_path = "~/".to_string();
 254        }
 255        cx.spawn(|this, mut cx| async move {
 256            let this = this.upgrade().context("Dropped worktree store")?;
 257
 258            let response = client
 259                .request(proto::AddWorktree {
 260                    project_id: SSH_PROJECT_ID,
 261                    path: abs_path.clone(),
 262                    visible,
 263                })
 264                .await?;
 265
 266            if let Some(existing_worktree) = this.read_with(&cx, |this, cx| {
 267                this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
 268            })? {
 269                return Ok(existing_worktree);
 270            }
 271
 272            let root_name = PathBuf::from(&response.canonicalized_path)
 273                .file_name()
 274                .map(|n| n.to_string_lossy().to_string())
 275                .unwrap_or(response.canonicalized_path.to_string());
 276
 277            let worktree = cx.update(|cx| {
 278                Worktree::remote(
 279                    SSH_PROJECT_ID,
 280                    0,
 281                    proto::WorktreeMetadata {
 282                        id: response.worktree_id,
 283                        root_name,
 284                        visible,
 285                        abs_path: response.canonicalized_path,
 286                    },
 287                    client,
 288                    cx,
 289                )
 290            })?;
 291
 292            this.update(&mut cx, |this, cx| {
 293                this.add(&worktree, cx);
 294            })?;
 295            Ok(worktree)
 296        })
 297    }
 298
 299    fn create_local_worktree(
 300        &mut self,
 301        fs: Arc<dyn Fs>,
 302        abs_path: impl Into<SanitizedPath>,
 303        visible: bool,
 304        cx: &mut ModelContext<Self>,
 305    ) -> Task<Result<Model<Worktree>, Arc<anyhow::Error>>> {
 306        let next_entry_id = self.next_entry_id.clone();
 307        let path: SanitizedPath = abs_path.into();
 308
 309        cx.spawn(move |this, mut cx| async move {
 310            let worktree = Worktree::local(path.clone(), visible, fs, next_entry_id, &mut cx).await;
 311
 312            let worktree = worktree?;
 313            this.update(&mut cx, |this, cx| this.add(&worktree, cx))?;
 314
 315            if visible {
 316                cx.update(|cx| {
 317                    cx.add_recent_document(path.as_path());
 318                })
 319                .log_err();
 320            }
 321
 322            Ok(worktree)
 323        })
 324    }
 325
 326    pub fn add(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
 327        let worktree_id = worktree.read(cx).id();
 328        debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
 329
 330        let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
 331        let handle = if push_strong_handle {
 332            WorktreeHandle::Strong(worktree.clone())
 333        } else {
 334            WorktreeHandle::Weak(worktree.downgrade())
 335        };
 336        if self.worktrees_reordered {
 337            self.worktrees.push(handle);
 338        } else {
 339            let i = match self
 340                .worktrees
 341                .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
 342                    other.upgrade().map(|worktree| worktree.read(cx).abs_path())
 343                }) {
 344                Ok(i) | Err(i) => i,
 345            };
 346            self.worktrees.insert(i, handle);
 347        }
 348
 349        cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
 350        self.send_project_updates(cx);
 351
 352        let handle_id = worktree.entity_id();
 353        cx.observe_release(worktree, move |this, worktree, cx| {
 354            cx.emit(WorktreeStoreEvent::WorktreeReleased(
 355                handle_id,
 356                worktree.id(),
 357            ));
 358            cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 359                handle_id,
 360                worktree.id(),
 361            ));
 362            this.send_project_updates(cx);
 363        })
 364        .detach();
 365    }
 366
 367    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut ModelContext<Self>) {
 368        self.worktrees.retain(|worktree| {
 369            if let Some(worktree) = worktree.upgrade() {
 370                if worktree.read(cx).id() == id_to_remove {
 371                    cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 372                        worktree.entity_id(),
 373                        id_to_remove,
 374                    ));
 375                    false
 376                } else {
 377                    true
 378                }
 379            } else {
 380                false
 381            }
 382        });
 383        self.send_project_updates(cx);
 384    }
 385
 386    pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
 387        self.worktrees_reordered = worktrees_reordered;
 388    }
 389
 390    fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
 391        match &self.state {
 392            WorktreeStoreState::Remote {
 393                upstream_client,
 394                upstream_project_id,
 395                ..
 396            } => Some((upstream_client.clone(), *upstream_project_id)),
 397            WorktreeStoreState::Local { .. } => None,
 398        }
 399    }
 400
 401    pub fn set_worktrees_from_proto(
 402        &mut self,
 403        worktrees: Vec<proto::WorktreeMetadata>,
 404        replica_id: ReplicaId,
 405        cx: &mut ModelContext<Self>,
 406    ) -> Result<()> {
 407        let mut old_worktrees_by_id = self
 408            .worktrees
 409            .drain(..)
 410            .filter_map(|worktree| {
 411                let worktree = worktree.upgrade()?;
 412                Some((worktree.read(cx).id(), worktree))
 413            })
 414            .collect::<HashMap<_, _>>();
 415
 416        let (client, project_id) = self
 417            .upstream_client()
 418            .clone()
 419            .ok_or_else(|| anyhow!("invalid project"))?;
 420
 421        for worktree in worktrees {
 422            if let Some(old_worktree) =
 423                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
 424            {
 425                let push_strong_handle =
 426                    self.retain_worktrees || old_worktree.read(cx).is_visible();
 427                let handle = if push_strong_handle {
 428                    WorktreeHandle::Strong(old_worktree.clone())
 429                } else {
 430                    WorktreeHandle::Weak(old_worktree.downgrade())
 431                };
 432                self.worktrees.push(handle);
 433            } else {
 434                self.add(
 435                    &Worktree::remote(project_id, replica_id, worktree, client.clone(), cx),
 436                    cx,
 437                );
 438            }
 439        }
 440        self.send_project_updates(cx);
 441
 442        Ok(())
 443    }
 444
 445    pub fn move_worktree(
 446        &mut self,
 447        source: WorktreeId,
 448        destination: WorktreeId,
 449        cx: &mut ModelContext<Self>,
 450    ) -> Result<()> {
 451        if source == destination {
 452            return Ok(());
 453        }
 454
 455        let mut source_index = None;
 456        let mut destination_index = None;
 457        for (i, worktree) in self.worktrees.iter().enumerate() {
 458            if let Some(worktree) = worktree.upgrade() {
 459                let worktree_id = worktree.read(cx).id();
 460                if worktree_id == source {
 461                    source_index = Some(i);
 462                    if destination_index.is_some() {
 463                        break;
 464                    }
 465                } else if worktree_id == destination {
 466                    destination_index = Some(i);
 467                    if source_index.is_some() {
 468                        break;
 469                    }
 470                }
 471            }
 472        }
 473
 474        let source_index =
 475            source_index.with_context(|| format!("Missing worktree for id {source}"))?;
 476        let destination_index =
 477            destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
 478
 479        if source_index == destination_index {
 480            return Ok(());
 481        }
 482
 483        let worktree_to_move = self.worktrees.remove(source_index);
 484        self.worktrees.insert(destination_index, worktree_to_move);
 485        self.worktrees_reordered = true;
 486        cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
 487        cx.notify();
 488        Ok(())
 489    }
 490
 491    pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
 492        for worktree in &self.worktrees {
 493            if let Some(worktree) = worktree.upgrade() {
 494                worktree.update(cx, |worktree, _| {
 495                    if let Some(worktree) = worktree.as_remote_mut() {
 496                        worktree.disconnected_from_host();
 497                    }
 498                });
 499            }
 500        }
 501    }
 502
 503    pub fn send_project_updates(&mut self, cx: &mut ModelContext<Self>) {
 504        let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
 505            return;
 506        };
 507
 508        let update = proto::UpdateProject {
 509            project_id,
 510            worktrees: self.worktree_metadata_protos(cx),
 511        };
 512
 513        // collab has bad concurrency guarantees, so we send requests in serial.
 514        let update_project = if downstream_client.is_via_collab() {
 515            Some(downstream_client.request(update))
 516        } else {
 517            downstream_client.send(update).log_err();
 518            None
 519        };
 520        cx.spawn(|this, mut cx| async move {
 521            if let Some(update_project) = update_project {
 522                update_project.await?;
 523            }
 524
 525            this.update(&mut cx, |this, cx| {
 526                let worktrees = this.worktrees().collect::<Vec<_>>();
 527
 528                for worktree in worktrees {
 529                    worktree.update(cx, |worktree, cx| {
 530                        let client = downstream_client.clone();
 531                        worktree.observe_updates(project_id, cx, {
 532                            move |update| {
 533                                let client = client.clone();
 534                                async move {
 535                                    if client.is_via_collab() {
 536                                        client
 537                                            .request(update)
 538                                            .map(|result| result.log_err().is_some())
 539                                            .await
 540                                    } else {
 541                                        client.send(update).log_err().is_some()
 542                                    }
 543                                }
 544                            }
 545                        });
 546                    });
 547
 548                    cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
 549                }
 550
 551                anyhow::Ok(())
 552            })
 553        })
 554        .detach_and_log_err(cx);
 555    }
 556
 557    pub fn worktree_metadata_protos(&self, cx: &AppContext) -> Vec<proto::WorktreeMetadata> {
 558        self.worktrees()
 559            .map(|worktree| {
 560                let worktree = worktree.read(cx);
 561                proto::WorktreeMetadata {
 562                    id: worktree.id().to_proto(),
 563                    root_name: worktree.root_name().into(),
 564                    visible: worktree.is_visible(),
 565                    abs_path: worktree.abs_path().to_string_lossy().into(),
 566                }
 567            })
 568            .collect()
 569    }
 570
 571    pub fn shared(
 572        &mut self,
 573        remote_id: u64,
 574        downsteam_client: AnyProtoClient,
 575        cx: &mut ModelContext<Self>,
 576    ) {
 577        self.retain_worktrees = true;
 578        self.downstream_client = Some((downsteam_client, remote_id));
 579
 580        // When shared, retain all worktrees
 581        for worktree_handle in self.worktrees.iter_mut() {
 582            match worktree_handle {
 583                WorktreeHandle::Strong(_) => {}
 584                WorktreeHandle::Weak(worktree) => {
 585                    if let Some(worktree) = worktree.upgrade() {
 586                        *worktree_handle = WorktreeHandle::Strong(worktree);
 587                    }
 588                }
 589            }
 590        }
 591        self.send_project_updates(cx);
 592    }
 593
 594    pub fn unshared(&mut self, cx: &mut ModelContext<Self>) {
 595        self.retain_worktrees = false;
 596        self.downstream_client.take();
 597
 598        // When not shared, only retain the visible worktrees
 599        for worktree_handle in self.worktrees.iter_mut() {
 600            if let WorktreeHandle::Strong(worktree) = worktree_handle {
 601                let is_visible = worktree.update(cx, |worktree, _| {
 602                    worktree.stop_observing_updates();
 603                    worktree.is_visible()
 604                });
 605                if !is_visible {
 606                    *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
 607                }
 608            }
 609        }
 610    }
 611
 612    /// search over all worktrees and return buffers that *might* match the search.
 613    pub fn find_search_candidates(
 614        &self,
 615        query: SearchQuery,
 616        limit: usize,
 617        open_entries: HashSet<ProjectEntryId>,
 618        fs: Arc<dyn Fs>,
 619        cx: &ModelContext<Self>,
 620    ) -> Receiver<ProjectPath> {
 621        let snapshots = self
 622            .visible_worktrees(cx)
 623            .filter_map(|tree| {
 624                let tree = tree.read(cx);
 625                Some((tree.snapshot(), tree.as_local()?.settings()))
 626            })
 627            .collect::<Vec<_>>();
 628
 629        let executor = cx.background_executor().clone();
 630
 631        // We want to return entries in the order they are in the worktrees, so we have one
 632        // thread that iterates over the worktrees (and ignored directories) as necessary,
 633        // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
 634        // channel.
 635        // We spawn a number of workers that take items from the filter channel and check the query
 636        // against the version of the file on disk.
 637        let (filter_tx, filter_rx) = smol::channel::bounded(64);
 638        let (output_tx, mut output_rx) = smol::channel::bounded(64);
 639        let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
 640
 641        let input = cx.background_executor().spawn({
 642            let fs = fs.clone();
 643            let query = query.clone();
 644            async move {
 645                Self::find_candidate_paths(
 646                    fs,
 647                    snapshots,
 648                    open_entries,
 649                    query,
 650                    filter_tx,
 651                    output_tx,
 652                )
 653                .await
 654                .log_err();
 655            }
 656        });
 657        const MAX_CONCURRENT_FILE_SCANS: usize = 64;
 658        let filters = cx.background_executor().spawn(async move {
 659            let fs = &fs;
 660            let query = &query;
 661            executor
 662                .scoped(move |scope| {
 663                    for _ in 0..MAX_CONCURRENT_FILE_SCANS {
 664                        let filter_rx = filter_rx.clone();
 665                        scope.spawn(async move {
 666                            Self::filter_paths(fs, filter_rx, query).await.log_err();
 667                        })
 668                    }
 669                })
 670                .await;
 671        });
 672        cx.background_executor()
 673            .spawn(async move {
 674                let mut matched = 0;
 675                while let Some(mut receiver) = output_rx.next().await {
 676                    let Some(path) = receiver.next().await else {
 677                        continue;
 678                    };
 679                    let Ok(_) = matching_paths_tx.send(path).await else {
 680                        break;
 681                    };
 682                    matched += 1;
 683                    if matched == limit {
 684                        break;
 685                    }
 686                }
 687                drop(input);
 688                drop(filters);
 689            })
 690            .detach();
 691        matching_paths_rx
 692    }
 693
 694    fn scan_ignored_dir<'a>(
 695        fs: &'a Arc<dyn Fs>,
 696        snapshot: &'a worktree::Snapshot,
 697        path: &'a Path,
 698        query: &'a SearchQuery,
 699        include_root: bool,
 700        filter_tx: &'a Sender<MatchingEntry>,
 701        output_tx: &'a Sender<oneshot::Receiver<ProjectPath>>,
 702    ) -> BoxFuture<'a, Result<()>> {
 703        async move {
 704            let abs_path = snapshot.abs_path().join(path);
 705            let Some(mut files) = fs
 706                .read_dir(&abs_path)
 707                .await
 708                .with_context(|| format!("listing ignored path {abs_path:?}"))
 709                .log_err()
 710            else {
 711                return Ok(());
 712            };
 713
 714            let mut results = Vec::new();
 715
 716            while let Some(Ok(file)) = files.next().await {
 717                let Some(metadata) = fs
 718                    .metadata(&file)
 719                    .await
 720                    .with_context(|| format!("fetching fs metadata for {abs_path:?}"))
 721                    .log_err()
 722                    .flatten()
 723                else {
 724                    continue;
 725                };
 726                if metadata.is_symlink || metadata.is_fifo {
 727                    continue;
 728                }
 729                results.push((
 730                    file.strip_prefix(snapshot.abs_path())?.to_path_buf(),
 731                    !metadata.is_dir,
 732                ))
 733            }
 734            results.sort_by(|(a_path, _), (b_path, _)| a_path.cmp(b_path));
 735            for (path, is_file) in results {
 736                if is_file {
 737                    if query.filters_path() {
 738                        let matched_path = if include_root {
 739                            let mut full_path = PathBuf::from(snapshot.root_name());
 740                            full_path.push(&path);
 741                            query.file_matches(&full_path)
 742                        } else {
 743                            query.file_matches(&path)
 744                        };
 745                        if !matched_path {
 746                            continue;
 747                        }
 748                    }
 749                    let (tx, rx) = oneshot::channel();
 750                    output_tx.send(rx).await?;
 751                    filter_tx
 752                        .send(MatchingEntry {
 753                            respond: tx,
 754                            worktree_path: snapshot.abs_path().clone(),
 755                            path: ProjectPath {
 756                                worktree_id: snapshot.id(),
 757                                path: Arc::from(path),
 758                            },
 759                        })
 760                        .await?;
 761                } else {
 762                    Self::scan_ignored_dir(
 763                        fs,
 764                        snapshot,
 765                        &path,
 766                        query,
 767                        include_root,
 768                        filter_tx,
 769                        output_tx,
 770                    )
 771                    .await?;
 772                }
 773            }
 774            Ok(())
 775        }
 776        .boxed()
 777    }
 778
 779    async fn find_candidate_paths(
 780        fs: Arc<dyn Fs>,
 781        snapshots: Vec<(worktree::Snapshot, WorktreeSettings)>,
 782        open_entries: HashSet<ProjectEntryId>,
 783        query: SearchQuery,
 784        filter_tx: Sender<MatchingEntry>,
 785        output_tx: Sender<oneshot::Receiver<ProjectPath>>,
 786    ) -> Result<()> {
 787        let include_root = snapshots.len() > 1;
 788        for (snapshot, settings) in snapshots {
 789            for entry in snapshot.entries(query.include_ignored(), 0) {
 790                if entry.is_dir() && entry.is_ignored {
 791                    if !settings.is_path_excluded(&entry.path) {
 792                        Self::scan_ignored_dir(
 793                            &fs,
 794                            &snapshot,
 795                            &entry.path,
 796                            &query,
 797                            include_root,
 798                            &filter_tx,
 799                            &output_tx,
 800                        )
 801                        .await?;
 802                    }
 803                    continue;
 804                }
 805
 806                if entry.is_fifo || !entry.is_file() {
 807                    continue;
 808                }
 809
 810                if query.filters_path() {
 811                    let matched_path = if include_root {
 812                        let mut full_path = PathBuf::from(snapshot.root_name());
 813                        full_path.push(&entry.path);
 814                        query.file_matches(&full_path)
 815                    } else {
 816                        query.file_matches(&entry.path)
 817                    };
 818                    if !matched_path {
 819                        continue;
 820                    }
 821                }
 822
 823                let (mut tx, rx) = oneshot::channel();
 824
 825                if open_entries.contains(&entry.id) {
 826                    tx.send(ProjectPath {
 827                        worktree_id: snapshot.id(),
 828                        path: entry.path.clone(),
 829                    })
 830                    .await?;
 831                } else {
 832                    filter_tx
 833                        .send(MatchingEntry {
 834                            respond: tx,
 835                            worktree_path: snapshot.abs_path().clone(),
 836                            path: ProjectPath {
 837                                worktree_id: snapshot.id(),
 838                                path: entry.path.clone(),
 839                            },
 840                        })
 841                        .await?;
 842                }
 843
 844                output_tx.send(rx).await?;
 845            }
 846        }
 847        Ok(())
 848    }
 849
 850    pub fn branches(
 851        &self,
 852        project_path: ProjectPath,
 853        cx: &AppContext,
 854    ) -> Task<Result<Vec<git::repository::Branch>>> {
 855        let Some(worktree) = self.worktree_for_id(project_path.worktree_id, cx) else {
 856            return Task::ready(Err(anyhow!("No worktree found for ProjectPath")));
 857        };
 858
 859        match worktree.read(cx) {
 860            Worktree::Local(local_worktree) => {
 861                let branches = util::maybe!({
 862                    let worktree_error = |error| {
 863                        format!(
 864                            "{} for worktree {}",
 865                            error,
 866                            local_worktree.abs_path().to_string_lossy()
 867                        )
 868                    };
 869
 870                    let entry = local_worktree
 871                        .git_entry(project_path.path)
 872                        .with_context(|| worktree_error("No git entry found"))?;
 873
 874                    let repo = local_worktree
 875                        .get_local_repo(&entry)
 876                        .with_context(|| worktree_error("No repository found"))?
 877                        .repo()
 878                        .clone();
 879
 880                    repo.branches()
 881                });
 882
 883                Task::ready(branches)
 884            }
 885            Worktree::Remote(remote_worktree) => {
 886                let request = remote_worktree.client().request(proto::GitBranches {
 887                    project_id: remote_worktree.project_id(),
 888                    repository: Some(proto::ProjectPath {
 889                        worktree_id: project_path.worktree_id.to_proto(),
 890                        path: project_path.path.to_string_lossy().to_string(), // Root path
 891                    }),
 892                });
 893
 894                cx.background_executor().spawn(async move {
 895                    let response = request.await?;
 896
 897                    let branches = response
 898                        .branches
 899                        .into_iter()
 900                        .map(|proto_branch| git::repository::Branch {
 901                            is_head: proto_branch.is_head,
 902                            name: proto_branch.name.into(),
 903                            unix_timestamp: proto_branch
 904                                .unix_timestamp
 905                                .map(|timestamp| timestamp as i64),
 906                        })
 907                        .collect();
 908
 909                    Ok(branches)
 910                })
 911            }
 912        }
 913    }
 914
 915    pub fn update_or_create_branch(
 916        &self,
 917        repository: ProjectPath,
 918        new_branch: String,
 919        cx: &AppContext,
 920    ) -> Task<Result<()>> {
 921        let Some(worktree) = self.worktree_for_id(repository.worktree_id, cx) else {
 922            return Task::ready(Err(anyhow!("No worktree found for ProjectPath")));
 923        };
 924
 925        match worktree.read(cx) {
 926            Worktree::Local(local_worktree) => {
 927                let result = util::maybe!({
 928                    let worktree_error = |error| {
 929                        format!(
 930                            "{} for worktree {}",
 931                            error,
 932                            local_worktree.abs_path().to_string_lossy()
 933                        )
 934                    };
 935
 936                    let entry = local_worktree
 937                        .git_entry(repository.path)
 938                        .with_context(|| worktree_error("No git entry found"))?;
 939
 940                    let repo = local_worktree
 941                        .get_local_repo(&entry)
 942                        .with_context(|| worktree_error("No repository found"))?
 943                        .repo()
 944                        .clone();
 945
 946                    if !repo.branch_exits(&new_branch)? {
 947                        repo.create_branch(&new_branch)?;
 948                    }
 949
 950                    repo.change_branch(&new_branch)?;
 951
 952                    Ok(())
 953                });
 954
 955                Task::ready(result)
 956            }
 957            Worktree::Remote(remote_worktree) => {
 958                let request = remote_worktree.client().request(proto::UpdateGitBranch {
 959                    project_id: remote_worktree.project_id(),
 960                    repository: Some(proto::ProjectPath {
 961                        worktree_id: repository.worktree_id.to_proto(),
 962                        path: repository.path.to_string_lossy().to_string(), // Root path
 963                    }),
 964                    branch_name: new_branch,
 965                });
 966
 967                cx.background_executor().spawn(async move {
 968                    request.await?;
 969                    Ok(())
 970                })
 971            }
 972        }
 973    }
 974
 975    async fn filter_paths(
 976        fs: &Arc<dyn Fs>,
 977        mut input: Receiver<MatchingEntry>,
 978        query: &SearchQuery,
 979    ) -> Result<()> {
 980        while let Some(mut entry) = input.next().await {
 981            let abs_path = entry.worktree_path.join(&entry.path.path);
 982            let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
 983                continue;
 984            };
 985            if query.detect(file).unwrap_or(false) {
 986                entry.respond.send(entry.path).await?
 987            }
 988        }
 989
 990        Ok(())
 991    }
 992
 993    pub async fn handle_create_project_entry(
 994        this: Model<Self>,
 995        envelope: TypedEnvelope<proto::CreateProjectEntry>,
 996        mut cx: AsyncAppContext,
 997    ) -> Result<proto::ProjectEntryResponse> {
 998        let worktree = this.update(&mut cx, |this, cx| {
 999            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
1000            this.worktree_for_id(worktree_id, cx)
1001                .ok_or_else(|| anyhow!("worktree not found"))
1002        })??;
1003        Worktree::handle_create_entry(worktree, envelope.payload, cx).await
1004    }
1005
1006    pub async fn handle_rename_project_entry(
1007        this: Model<Self>,
1008        envelope: TypedEnvelope<proto::RenameProjectEntry>,
1009        mut cx: AsyncAppContext,
1010    ) -> Result<proto::ProjectEntryResponse> {
1011        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1012        let worktree = this.update(&mut cx, |this, cx| {
1013            this.worktree_for_entry(entry_id, cx)
1014                .ok_or_else(|| anyhow!("worktree not found"))
1015        })??;
1016        Worktree::handle_rename_entry(worktree, envelope.payload, cx).await
1017    }
1018
1019    pub async fn handle_copy_project_entry(
1020        this: Model<Self>,
1021        envelope: TypedEnvelope<proto::CopyProjectEntry>,
1022        mut cx: AsyncAppContext,
1023    ) -> Result<proto::ProjectEntryResponse> {
1024        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1025        let worktree = this.update(&mut cx, |this, cx| {
1026            this.worktree_for_entry(entry_id, cx)
1027                .ok_or_else(|| anyhow!("worktree not found"))
1028        })??;
1029        Worktree::handle_copy_entry(worktree, envelope.payload, cx).await
1030    }
1031
1032    pub async fn handle_delete_project_entry(
1033        this: Model<Self>,
1034        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
1035        mut cx: AsyncAppContext,
1036    ) -> Result<proto::ProjectEntryResponse> {
1037        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1038        let worktree = this.update(&mut cx, |this, cx| {
1039            this.worktree_for_entry(entry_id, cx)
1040                .ok_or_else(|| anyhow!("worktree not found"))
1041        })??;
1042        Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
1043    }
1044
1045    pub async fn handle_expand_project_entry(
1046        this: Model<Self>,
1047        envelope: TypedEnvelope<proto::ExpandProjectEntry>,
1048        mut cx: AsyncAppContext,
1049    ) -> Result<proto::ExpandProjectEntryResponse> {
1050        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1051        let worktree = this
1052            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1053            .ok_or_else(|| anyhow!("invalid request"))?;
1054        Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
1055    }
1056
1057    pub async fn handle_git_branches(
1058        this: Model<Self>,
1059        branches: TypedEnvelope<proto::GitBranches>,
1060        cx: AsyncAppContext,
1061    ) -> Result<proto::GitBranchesResponse> {
1062        let project_path = branches
1063            .payload
1064            .repository
1065            .clone()
1066            .context("Invalid GitBranches call")?;
1067        let project_path = ProjectPath {
1068            worktree_id: WorktreeId::from_proto(project_path.worktree_id),
1069            path: Path::new(&project_path.path).into(),
1070        };
1071
1072        let branches = this
1073            .read_with(&cx, |this, cx| this.branches(project_path, cx))?
1074            .await?;
1075
1076        Ok(proto::GitBranchesResponse {
1077            branches: branches
1078                .into_iter()
1079                .map(|branch| proto::Branch {
1080                    is_head: branch.is_head,
1081                    name: branch.name.to_string(),
1082                    unix_timestamp: branch.unix_timestamp.map(|timestamp| timestamp as u64),
1083                })
1084                .collect(),
1085        })
1086    }
1087
1088    pub async fn handle_update_branch(
1089        this: Model<Self>,
1090        update_branch: TypedEnvelope<proto::UpdateGitBranch>,
1091        cx: AsyncAppContext,
1092    ) -> Result<proto::Ack> {
1093        let project_path = update_branch
1094            .payload
1095            .repository
1096            .clone()
1097            .context("Invalid GitBranches call")?;
1098        let project_path = ProjectPath {
1099            worktree_id: WorktreeId::from_proto(project_path.worktree_id),
1100            path: Path::new(&project_path.path).into(),
1101        };
1102        let new_branch = update_branch.payload.branch_name;
1103
1104        this.read_with(&cx, |this, cx| {
1105            this.update_or_create_branch(project_path, new_branch, cx)
1106        })?
1107        .await?;
1108
1109        Ok(proto::Ack {})
1110    }
1111}
1112
1113#[derive(Clone, Debug)]
1114enum WorktreeHandle {
1115    Strong(Model<Worktree>),
1116    Weak(WeakModel<Worktree>),
1117}
1118
1119impl WorktreeHandle {
1120    fn upgrade(&self) -> Option<Model<Worktree>> {
1121        match self {
1122            WorktreeHandle::Strong(handle) => Some(handle.clone()),
1123            WorktreeHandle::Weak(handle) => handle.upgrade(),
1124        }
1125    }
1126}