worktree_store.rs

   1use std::{
   2    io::{BufRead, BufReader},
   3    path::{Path, PathBuf},
   4    pin::pin,
   5    sync::{atomic::AtomicUsize, Arc},
   6};
   7
   8use anyhow::{anyhow, Context as _, Result};
   9use collections::{HashMap, HashSet};
  10use fs::Fs;
  11use futures::{
  12    future::{BoxFuture, Shared},
  13    FutureExt, SinkExt,
  14};
  15use git::repository::Branch;
  16use gpui::{App, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity};
  17use postage::oneshot;
  18use rpc::{
  19    proto::{self, FromProto, ToProto, SSH_PROJECT_ID},
  20    AnyProtoClient, ErrorExt, TypedEnvelope,
  21};
  22use smol::{
  23    channel::{Receiver, Sender},
  24    stream::StreamExt,
  25};
  26use text::ReplicaId;
  27use util::{paths::SanitizedPath, ResultExt};
  28use worktree::{
  29    branch_to_proto, Entry, ProjectEntryId, UpdatedEntriesSet, Worktree, WorktreeId,
  30    WorktreeSettings,
  31};
  32
  33use crate::{search::SearchQuery, ProjectPath};
  34
  35struct MatchingEntry {
  36    worktree_path: Arc<Path>,
  37    path: ProjectPath,
  38    respond: oneshot::Sender<ProjectPath>,
  39}
  40
  41enum WorktreeStoreState {
  42    Local {
  43        fs: Arc<dyn Fs>,
  44    },
  45    Remote {
  46        upstream_client: AnyProtoClient,
  47        upstream_project_id: u64,
  48    },
  49}
  50
  51pub struct WorktreeStore {
  52    next_entry_id: Arc<AtomicUsize>,
  53    downstream_client: Option<(AnyProtoClient, u64)>,
  54    retain_worktrees: bool,
  55    worktrees: Vec<WorktreeHandle>,
  56    worktrees_reordered: bool,
  57    #[allow(clippy::type_complexity)]
  58    loading_worktrees:
  59        HashMap<SanitizedPath, Shared<Task<Result<Entity<Worktree>, Arc<anyhow::Error>>>>>,
  60    state: WorktreeStoreState,
  61}
  62
  63#[derive(Debug)]
  64pub enum WorktreeStoreEvent {
  65    WorktreeAdded(Entity<Worktree>),
  66    WorktreeRemoved(EntityId, WorktreeId),
  67    WorktreeReleased(EntityId, WorktreeId),
  68    WorktreeOrderChanged,
  69    WorktreeUpdateSent(Entity<Worktree>),
  70    WorktreeUpdatedEntries(WorktreeId, UpdatedEntriesSet),
  71    WorktreeUpdatedGitRepositories(WorktreeId),
  72    WorktreeDeletedEntry(WorktreeId, ProjectEntryId),
  73}
  74
  75impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
  76
  77impl WorktreeStore {
  78    pub fn init(client: &AnyProtoClient) {
  79        client.add_entity_request_handler(Self::handle_create_project_entry);
  80        client.add_entity_request_handler(Self::handle_copy_project_entry);
  81        client.add_entity_request_handler(Self::handle_delete_project_entry);
  82        client.add_entity_request_handler(Self::handle_expand_project_entry);
  83        client.add_entity_request_handler(Self::handle_expand_all_for_project_entry);
  84        client.add_entity_request_handler(Self::handle_git_branches);
  85        client.add_entity_request_handler(Self::handle_update_branch);
  86    }
  87
  88    pub fn local(retain_worktrees: bool, fs: Arc<dyn Fs>) -> Self {
  89        Self {
  90            next_entry_id: Default::default(),
  91            loading_worktrees: Default::default(),
  92            downstream_client: None,
  93            worktrees: Vec::new(),
  94            worktrees_reordered: false,
  95            retain_worktrees,
  96            state: WorktreeStoreState::Local { fs },
  97        }
  98    }
  99
 100    pub fn remote(
 101        retain_worktrees: bool,
 102        upstream_client: AnyProtoClient,
 103        upstream_project_id: u64,
 104    ) -> Self {
 105        Self {
 106            next_entry_id: Default::default(),
 107            loading_worktrees: Default::default(),
 108            downstream_client: None,
 109            worktrees: Vec::new(),
 110            worktrees_reordered: false,
 111            retain_worktrees,
 112            state: WorktreeStoreState::Remote {
 113                upstream_client,
 114                upstream_project_id,
 115            },
 116        }
 117    }
 118
 119    /// Iterates through all worktrees, including ones that don't appear in the project panel
 120    pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Entity<Worktree>> {
 121        self.worktrees
 122            .iter()
 123            .filter_map(move |worktree| worktree.upgrade())
 124    }
 125
 126    /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
 127    pub fn visible_worktrees<'a>(
 128        &'a self,
 129        cx: &'a App,
 130    ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
 131        self.worktrees()
 132            .filter(|worktree| worktree.read(cx).is_visible())
 133    }
 134
 135    pub fn worktree_for_id(&self, id: WorktreeId, cx: &App) -> Option<Entity<Worktree>> {
 136        self.worktrees()
 137            .find(|worktree| worktree.read(cx).id() == id)
 138    }
 139
 140    pub fn current_branch(&self, repository: ProjectPath, cx: &App) -> Option<Branch> {
 141        self.worktree_for_id(repository.worktree_id, cx)?
 142            .read(cx)
 143            .git_entry(repository.path)?
 144            .branch()
 145            .cloned()
 146    }
 147
 148    pub fn worktree_for_entry(
 149        &self,
 150        entry_id: ProjectEntryId,
 151        cx: &App,
 152    ) -> Option<Entity<Worktree>> {
 153        self.worktrees()
 154            .find(|worktree| worktree.read(cx).contains_entry(entry_id))
 155    }
 156
 157    pub fn find_worktree(
 158        &self,
 159        abs_path: impl Into<SanitizedPath>,
 160        cx: &App,
 161    ) -> Option<(Entity<Worktree>, PathBuf)> {
 162        let abs_path: SanitizedPath = abs_path.into();
 163        for tree in self.worktrees() {
 164            if let Ok(relative_path) = abs_path.as_path().strip_prefix(tree.read(cx).abs_path()) {
 165                return Some((tree.clone(), relative_path.into()));
 166            }
 167        }
 168        None
 169    }
 170
 171    pub fn find_or_create_worktree(
 172        &mut self,
 173        abs_path: impl AsRef<Path>,
 174        visible: bool,
 175        cx: &mut Context<Self>,
 176    ) -> Task<Result<(Entity<Worktree>, PathBuf)>> {
 177        let abs_path = abs_path.as_ref();
 178        if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
 179            Task::ready(Ok((tree, relative_path)))
 180        } else {
 181            let worktree = self.create_worktree(abs_path, visible, cx);
 182            cx.background_executor()
 183                .spawn(async move { Ok((worktree.await?, PathBuf::new())) })
 184        }
 185    }
 186
 187    pub fn entry_for_id<'a>(&'a self, entry_id: ProjectEntryId, cx: &'a App) -> Option<&'a Entry> {
 188        self.worktrees()
 189            .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
 190    }
 191
 192    pub fn worktree_and_entry_for_id<'a>(
 193        &'a self,
 194        entry_id: ProjectEntryId,
 195        cx: &'a App,
 196    ) -> Option<(Entity<Worktree>, &'a Entry)> {
 197        self.worktrees().find_map(|worktree| {
 198            worktree
 199                .read(cx)
 200                .entry_for_id(entry_id)
 201                .map(|e| (worktree.clone(), e))
 202        })
 203    }
 204
 205    pub fn entry_for_path(&self, path: &ProjectPath, cx: &App) -> Option<Entry> {
 206        self.worktree_for_id(path.worktree_id, cx)?
 207            .read(cx)
 208            .entry_for_path(&path.path)
 209            .cloned()
 210    }
 211
 212    pub fn create_worktree(
 213        &mut self,
 214        abs_path: impl Into<SanitizedPath>,
 215        visible: bool,
 216        cx: &mut Context<Self>,
 217    ) -> Task<Result<Entity<Worktree>>> {
 218        let abs_path: SanitizedPath = abs_path.into();
 219        if !self.loading_worktrees.contains_key(&abs_path) {
 220            let task = match &self.state {
 221                WorktreeStoreState::Remote {
 222                    upstream_client, ..
 223                } => {
 224                    if upstream_client.is_via_collab() {
 225                        Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
 226                    } else {
 227                        self.create_ssh_worktree(
 228                            upstream_client.clone(),
 229                            abs_path.clone(),
 230                            visible,
 231                            cx,
 232                        )
 233                    }
 234                }
 235                WorktreeStoreState::Local { fs } => {
 236                    self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
 237                }
 238            };
 239
 240            self.loading_worktrees
 241                .insert(abs_path.clone(), task.shared());
 242        }
 243        let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
 244        cx.spawn(|this, mut cx| async move {
 245            let result = task.await;
 246            this.update(&mut cx, |this, _| this.loading_worktrees.remove(&abs_path))
 247                .ok();
 248            match result {
 249                Ok(worktree) => Ok(worktree),
 250                Err(err) => Err((*err).cloned()),
 251            }
 252        })
 253    }
 254
 255    fn create_ssh_worktree(
 256        &mut self,
 257        client: AnyProtoClient,
 258        abs_path: impl Into<SanitizedPath>,
 259        visible: bool,
 260        cx: &mut Context<Self>,
 261    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
 262        let mut abs_path = Into::<SanitizedPath>::into(abs_path).to_string();
 263        // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
 264        // in which case want to strip the leading the `/`.
 265        // On the host-side, the `~` will get expanded.
 266        // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
 267        if abs_path.starts_with("/~") {
 268            abs_path = abs_path[1..].to_string();
 269        }
 270        if abs_path.is_empty() || abs_path == "/" {
 271            abs_path = "~/".to_string();
 272        }
 273        cx.spawn(|this, mut cx| async move {
 274            let this = this.upgrade().context("Dropped worktree store")?;
 275
 276            let path = Path::new(abs_path.as_str());
 277            let response = client
 278                .request(proto::AddWorktree {
 279                    project_id: SSH_PROJECT_ID,
 280                    path: path.to_proto(),
 281                    visible,
 282                })
 283                .await?;
 284
 285            if let Some(existing_worktree) = this.read_with(&cx, |this, cx| {
 286                this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
 287            })? {
 288                return Ok(existing_worktree);
 289            }
 290
 291            let root_path_buf = PathBuf::from_proto(response.canonicalized_path.clone());
 292            let root_name = root_path_buf
 293                .file_name()
 294                .map(|n| n.to_string_lossy().to_string())
 295                .unwrap_or(root_path_buf.to_string_lossy().to_string());
 296
 297            let worktree = cx.update(|cx| {
 298                Worktree::remote(
 299                    SSH_PROJECT_ID,
 300                    0,
 301                    proto::WorktreeMetadata {
 302                        id: response.worktree_id,
 303                        root_name,
 304                        visible,
 305                        abs_path: response.canonicalized_path,
 306                    },
 307                    client,
 308                    cx,
 309                )
 310            })?;
 311
 312            this.update(&mut cx, |this, cx| {
 313                this.add(&worktree, cx);
 314            })?;
 315            Ok(worktree)
 316        })
 317    }
 318
 319    fn create_local_worktree(
 320        &mut self,
 321        fs: Arc<dyn Fs>,
 322        abs_path: impl Into<SanitizedPath>,
 323        visible: bool,
 324        cx: &mut Context<Self>,
 325    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
 326        let next_entry_id = self.next_entry_id.clone();
 327        let path: SanitizedPath = abs_path.into();
 328
 329        cx.spawn(move |this, mut cx| async move {
 330            let worktree = Worktree::local(path.clone(), visible, fs, next_entry_id, &mut cx).await;
 331
 332            let worktree = worktree?;
 333
 334            this.update(&mut cx, |this, cx| this.add(&worktree, cx))?;
 335
 336            if visible {
 337                cx.update(|cx| {
 338                    cx.add_recent_document(path.as_path());
 339                })
 340                .log_err();
 341            }
 342
 343            Ok(worktree)
 344        })
 345    }
 346
 347    pub fn add(&mut self, worktree: &Entity<Worktree>, cx: &mut Context<Self>) {
 348        let worktree_id = worktree.read(cx).id();
 349        debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
 350
 351        let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
 352        let handle = if push_strong_handle {
 353            WorktreeHandle::Strong(worktree.clone())
 354        } else {
 355            WorktreeHandle::Weak(worktree.downgrade())
 356        };
 357        if self.worktrees_reordered {
 358            self.worktrees.push(handle);
 359        } else {
 360            let i = match self
 361                .worktrees
 362                .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
 363                    other.upgrade().map(|worktree| worktree.read(cx).abs_path())
 364                }) {
 365                Ok(i) | Err(i) => i,
 366            };
 367            self.worktrees.insert(i, handle);
 368        }
 369
 370        cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
 371        self.send_project_updates(cx);
 372
 373        let handle_id = worktree.entity_id();
 374        cx.subscribe(worktree, |_, worktree, event, cx| {
 375            let worktree_id = worktree.update(cx, |worktree, _| worktree.id());
 376            match event {
 377                worktree::Event::UpdatedEntries(changes) => {
 378                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedEntries(
 379                        worktree.read(cx).id(),
 380                        changes.clone(),
 381                    ));
 382                }
 383                worktree::Event::UpdatedGitRepositories(_) => {
 384                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedGitRepositories(
 385                        worktree_id,
 386                    ));
 387                }
 388                worktree::Event::DeletedEntry(id) => {
 389                    cx.emit(WorktreeStoreEvent::WorktreeDeletedEntry(worktree_id, *id))
 390                }
 391            }
 392        })
 393        .detach();
 394        cx.observe_release(worktree, move |this, worktree, cx| {
 395            cx.emit(WorktreeStoreEvent::WorktreeReleased(
 396                handle_id,
 397                worktree.id(),
 398            ));
 399            cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 400                handle_id,
 401                worktree.id(),
 402            ));
 403            this.send_project_updates(cx);
 404        })
 405        .detach();
 406    }
 407
 408    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut Context<Self>) {
 409        self.worktrees.retain(|worktree| {
 410            if let Some(worktree) = worktree.upgrade() {
 411                if worktree.read(cx).id() == id_to_remove {
 412                    cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 413                        worktree.entity_id(),
 414                        id_to_remove,
 415                    ));
 416                    false
 417                } else {
 418                    true
 419                }
 420            } else {
 421                false
 422            }
 423        });
 424        self.send_project_updates(cx);
 425    }
 426
 427    pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
 428        self.worktrees_reordered = worktrees_reordered;
 429    }
 430
 431    fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
 432        match &self.state {
 433            WorktreeStoreState::Remote {
 434                upstream_client,
 435                upstream_project_id,
 436                ..
 437            } => Some((upstream_client.clone(), *upstream_project_id)),
 438            WorktreeStoreState::Local { .. } => None,
 439        }
 440    }
 441
 442    pub fn set_worktrees_from_proto(
 443        &mut self,
 444        worktrees: Vec<proto::WorktreeMetadata>,
 445        replica_id: ReplicaId,
 446        cx: &mut Context<Self>,
 447    ) -> Result<()> {
 448        let mut old_worktrees_by_id = self
 449            .worktrees
 450            .drain(..)
 451            .filter_map(|worktree| {
 452                let worktree = worktree.upgrade()?;
 453                Some((worktree.read(cx).id(), worktree))
 454            })
 455            .collect::<HashMap<_, _>>();
 456
 457        let (client, project_id) = self
 458            .upstream_client()
 459            .clone()
 460            .ok_or_else(|| anyhow!("invalid project"))?;
 461
 462        for worktree in worktrees {
 463            if let Some(old_worktree) =
 464                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
 465            {
 466                let push_strong_handle =
 467                    self.retain_worktrees || old_worktree.read(cx).is_visible();
 468                let handle = if push_strong_handle {
 469                    WorktreeHandle::Strong(old_worktree.clone())
 470                } else {
 471                    WorktreeHandle::Weak(old_worktree.downgrade())
 472                };
 473                self.worktrees.push(handle);
 474            } else {
 475                self.add(
 476                    &Worktree::remote(project_id, replica_id, worktree, client.clone(), cx),
 477                    cx,
 478                );
 479            }
 480        }
 481        self.send_project_updates(cx);
 482
 483        Ok(())
 484    }
 485
 486    pub fn move_worktree(
 487        &mut self,
 488        source: WorktreeId,
 489        destination: WorktreeId,
 490        cx: &mut Context<Self>,
 491    ) -> Result<()> {
 492        if source == destination {
 493            return Ok(());
 494        }
 495
 496        let mut source_index = None;
 497        let mut destination_index = None;
 498        for (i, worktree) in self.worktrees.iter().enumerate() {
 499            if let Some(worktree) = worktree.upgrade() {
 500                let worktree_id = worktree.read(cx).id();
 501                if worktree_id == source {
 502                    source_index = Some(i);
 503                    if destination_index.is_some() {
 504                        break;
 505                    }
 506                } else if worktree_id == destination {
 507                    destination_index = Some(i);
 508                    if source_index.is_some() {
 509                        break;
 510                    }
 511                }
 512            }
 513        }
 514
 515        let source_index =
 516            source_index.with_context(|| format!("Missing worktree for id {source}"))?;
 517        let destination_index =
 518            destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
 519
 520        if source_index == destination_index {
 521            return Ok(());
 522        }
 523
 524        let worktree_to_move = self.worktrees.remove(source_index);
 525        self.worktrees.insert(destination_index, worktree_to_move);
 526        self.worktrees_reordered = true;
 527        cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
 528        cx.notify();
 529        Ok(())
 530    }
 531
 532    pub fn disconnected_from_host(&mut self, cx: &mut App) {
 533        for worktree in &self.worktrees {
 534            if let Some(worktree) = worktree.upgrade() {
 535                worktree.update(cx, |worktree, _| {
 536                    if let Some(worktree) = worktree.as_remote_mut() {
 537                        worktree.disconnected_from_host();
 538                    }
 539                });
 540            }
 541        }
 542    }
 543
 544    pub fn send_project_updates(&mut self, cx: &mut Context<Self>) {
 545        let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
 546            return;
 547        };
 548
 549        let update = proto::UpdateProject {
 550            project_id,
 551            worktrees: self.worktree_metadata_protos(cx),
 552        };
 553
 554        // collab has bad concurrency guarantees, so we send requests in serial.
 555        let update_project = if downstream_client.is_via_collab() {
 556            Some(downstream_client.request(update))
 557        } else {
 558            downstream_client.send(update).log_err();
 559            None
 560        };
 561        cx.spawn(|this, mut cx| async move {
 562            if let Some(update_project) = update_project {
 563                update_project.await?;
 564            }
 565
 566            this.update(&mut cx, |this, cx| {
 567                let worktrees = this.worktrees().collect::<Vec<_>>();
 568
 569                for worktree in worktrees {
 570                    worktree.update(cx, |worktree, cx| {
 571                        let client = downstream_client.clone();
 572                        worktree.observe_updates(project_id, cx, {
 573                            move |update| {
 574                                let client = client.clone();
 575                                async move {
 576                                    if client.is_via_collab() {
 577                                        client
 578                                            .request(update)
 579                                            .map(|result| result.log_err().is_some())
 580                                            .await
 581                                    } else {
 582                                        client.send(update).log_err().is_some()
 583                                    }
 584                                }
 585                            }
 586                        });
 587                    });
 588
 589                    cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
 590                }
 591
 592                anyhow::Ok(())
 593            })
 594        })
 595        .detach_and_log_err(cx);
 596    }
 597
 598    pub fn worktree_metadata_protos(&self, cx: &App) -> Vec<proto::WorktreeMetadata> {
 599        self.worktrees()
 600            .map(|worktree| {
 601                let worktree = worktree.read(cx);
 602                proto::WorktreeMetadata {
 603                    id: worktree.id().to_proto(),
 604                    root_name: worktree.root_name().into(),
 605                    visible: worktree.is_visible(),
 606                    abs_path: worktree.abs_path().to_proto(),
 607                }
 608            })
 609            .collect()
 610    }
 611
 612    pub fn shared(
 613        &mut self,
 614        remote_id: u64,
 615        downstream_client: AnyProtoClient,
 616        cx: &mut Context<Self>,
 617    ) {
 618        self.retain_worktrees = true;
 619        self.downstream_client = Some((downstream_client, remote_id));
 620
 621        // When shared, retain all worktrees
 622        for worktree_handle in self.worktrees.iter_mut() {
 623            match worktree_handle {
 624                WorktreeHandle::Strong(_) => {}
 625                WorktreeHandle::Weak(worktree) => {
 626                    if let Some(worktree) = worktree.upgrade() {
 627                        *worktree_handle = WorktreeHandle::Strong(worktree);
 628                    }
 629                }
 630            }
 631        }
 632        self.send_project_updates(cx);
 633    }
 634
 635    pub fn unshared(&mut self, cx: &mut Context<Self>) {
 636        self.retain_worktrees = false;
 637        self.downstream_client.take();
 638
 639        // When not shared, only retain the visible worktrees
 640        for worktree_handle in self.worktrees.iter_mut() {
 641            if let WorktreeHandle::Strong(worktree) = worktree_handle {
 642                let is_visible = worktree.update(cx, |worktree, _| {
 643                    worktree.stop_observing_updates();
 644                    worktree.is_visible()
 645                });
 646                if !is_visible {
 647                    *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
 648                }
 649            }
 650        }
 651    }
 652
 653    /// search over all worktrees and return buffers that *might* match the search.
 654    pub fn find_search_candidates(
 655        &self,
 656        query: SearchQuery,
 657        limit: usize,
 658        open_entries: HashSet<ProjectEntryId>,
 659        fs: Arc<dyn Fs>,
 660        cx: &Context<Self>,
 661    ) -> Receiver<ProjectPath> {
 662        let snapshots = self
 663            .visible_worktrees(cx)
 664            .filter_map(|tree| {
 665                let tree = tree.read(cx);
 666                Some((tree.snapshot(), tree.as_local()?.settings()))
 667            })
 668            .collect::<Vec<_>>();
 669
 670        let executor = cx.background_executor().clone();
 671
 672        // We want to return entries in the order they are in the worktrees, so we have one
 673        // thread that iterates over the worktrees (and ignored directories) as necessary,
 674        // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
 675        // channel.
 676        // We spawn a number of workers that take items from the filter channel and check the query
 677        // against the version of the file on disk.
 678        let (filter_tx, filter_rx) = smol::channel::bounded(64);
 679        let (output_tx, output_rx) = smol::channel::bounded(64);
 680        let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
 681
 682        let input = cx.background_executor().spawn({
 683            let fs = fs.clone();
 684            let query = query.clone();
 685            async move {
 686                Self::find_candidate_paths(
 687                    fs,
 688                    snapshots,
 689                    open_entries,
 690                    query,
 691                    filter_tx,
 692                    output_tx,
 693                )
 694                .await
 695                .log_err();
 696            }
 697        });
 698        const MAX_CONCURRENT_FILE_SCANS: usize = 64;
 699        let filters = cx.background_executor().spawn(async move {
 700            let fs = &fs;
 701            let query = &query;
 702            executor
 703                .scoped(move |scope| {
 704                    for _ in 0..MAX_CONCURRENT_FILE_SCANS {
 705                        let filter_rx = filter_rx.clone();
 706                        scope.spawn(async move {
 707                            Self::filter_paths(fs, filter_rx, query)
 708                                .await
 709                                .log_with_level(log::Level::Debug);
 710                        })
 711                    }
 712                })
 713                .await;
 714        });
 715        cx.background_executor()
 716            .spawn(async move {
 717                let mut matched = 0;
 718                while let Ok(mut receiver) = output_rx.recv().await {
 719                    let Some(path) = receiver.next().await else {
 720                        continue;
 721                    };
 722                    let Ok(_) = matching_paths_tx.send(path).await else {
 723                        break;
 724                    };
 725                    matched += 1;
 726                    if matched == limit {
 727                        break;
 728                    }
 729                }
 730                drop(input);
 731                drop(filters);
 732            })
 733            .detach();
 734        matching_paths_rx
 735    }
 736
 737    fn scan_ignored_dir<'a>(
 738        fs: &'a Arc<dyn Fs>,
 739        snapshot: &'a worktree::Snapshot,
 740        path: &'a Path,
 741        query: &'a SearchQuery,
 742        include_root: bool,
 743        filter_tx: &'a Sender<MatchingEntry>,
 744        output_tx: &'a Sender<oneshot::Receiver<ProjectPath>>,
 745    ) -> BoxFuture<'a, Result<()>> {
 746        async move {
 747            let abs_path = snapshot.abs_path().join(path);
 748            let Some(mut files) = fs
 749                .read_dir(&abs_path)
 750                .await
 751                .with_context(|| format!("listing ignored path {abs_path:?}"))
 752                .log_err()
 753            else {
 754                return Ok(());
 755            };
 756
 757            let mut results = Vec::new();
 758
 759            while let Some(Ok(file)) = files.next().await {
 760                let Some(metadata) = fs
 761                    .metadata(&file)
 762                    .await
 763                    .with_context(|| format!("fetching fs metadata for {abs_path:?}"))
 764                    .log_err()
 765                    .flatten()
 766                else {
 767                    continue;
 768                };
 769                if metadata.is_symlink || metadata.is_fifo {
 770                    continue;
 771                }
 772                results.push((
 773                    file.strip_prefix(snapshot.abs_path())?.to_path_buf(),
 774                    !metadata.is_dir,
 775                ))
 776            }
 777            results.sort_by(|(a_path, _), (b_path, _)| a_path.cmp(b_path));
 778            for (path, is_file) in results {
 779                if is_file {
 780                    if query.filters_path() {
 781                        let matched_path = if include_root {
 782                            let mut full_path = PathBuf::from(snapshot.root_name());
 783                            full_path.push(&path);
 784                            query.file_matches(&full_path)
 785                        } else {
 786                            query.file_matches(&path)
 787                        };
 788                        if !matched_path {
 789                            continue;
 790                        }
 791                    }
 792                    let (tx, rx) = oneshot::channel();
 793                    output_tx.send(rx).await?;
 794                    filter_tx
 795                        .send(MatchingEntry {
 796                            respond: tx,
 797                            worktree_path: snapshot.abs_path().clone(),
 798                            path: ProjectPath {
 799                                worktree_id: snapshot.id(),
 800                                path: Arc::from(path),
 801                            },
 802                        })
 803                        .await?;
 804                } else {
 805                    Self::scan_ignored_dir(
 806                        fs,
 807                        snapshot,
 808                        &path,
 809                        query,
 810                        include_root,
 811                        filter_tx,
 812                        output_tx,
 813                    )
 814                    .await?;
 815                }
 816            }
 817            Ok(())
 818        }
 819        .boxed()
 820    }
 821
 822    async fn find_candidate_paths(
 823        fs: Arc<dyn Fs>,
 824        snapshots: Vec<(worktree::Snapshot, WorktreeSettings)>,
 825        open_entries: HashSet<ProjectEntryId>,
 826        query: SearchQuery,
 827        filter_tx: Sender<MatchingEntry>,
 828        output_tx: Sender<oneshot::Receiver<ProjectPath>>,
 829    ) -> Result<()> {
 830        let include_root = snapshots.len() > 1;
 831        for (snapshot, settings) in snapshots {
 832            for entry in snapshot.entries(query.include_ignored(), 0) {
 833                if entry.is_dir() && entry.is_ignored {
 834                    if !settings.is_path_excluded(&entry.path) {
 835                        Self::scan_ignored_dir(
 836                            &fs,
 837                            &snapshot,
 838                            &entry.path,
 839                            &query,
 840                            include_root,
 841                            &filter_tx,
 842                            &output_tx,
 843                        )
 844                        .await?;
 845                    }
 846                    continue;
 847                }
 848
 849                if entry.is_fifo || !entry.is_file() {
 850                    continue;
 851                }
 852
 853                if query.filters_path() {
 854                    let matched_path = if include_root {
 855                        let mut full_path = PathBuf::from(snapshot.root_name());
 856                        full_path.push(&entry.path);
 857                        query.file_matches(&full_path)
 858                    } else {
 859                        query.file_matches(&entry.path)
 860                    };
 861                    if !matched_path {
 862                        continue;
 863                    }
 864                }
 865
 866                let (mut tx, rx) = oneshot::channel();
 867
 868                if open_entries.contains(&entry.id) {
 869                    tx.send(ProjectPath {
 870                        worktree_id: snapshot.id(),
 871                        path: entry.path.clone(),
 872                    })
 873                    .await?;
 874                } else {
 875                    filter_tx
 876                        .send(MatchingEntry {
 877                            respond: tx,
 878                            worktree_path: snapshot.abs_path().clone(),
 879                            path: ProjectPath {
 880                                worktree_id: snapshot.id(),
 881                                path: entry.path.clone(),
 882                            },
 883                        })
 884                        .await?;
 885                }
 886
 887                output_tx.send(rx).await?;
 888            }
 889        }
 890        Ok(())
 891    }
 892
 893    pub fn branches(
 894        &self,
 895        project_path: ProjectPath,
 896        cx: &App,
 897    ) -> Task<Result<Vec<git::repository::Branch>>> {
 898        let Some(worktree) = self.worktree_for_id(project_path.worktree_id, cx) else {
 899            return Task::ready(Err(anyhow!("No worktree found for ProjectPath")));
 900        };
 901
 902        match worktree.read(cx) {
 903            Worktree::Local(local_worktree) => {
 904                let branches = util::maybe!({
 905                    let worktree_error = |error| {
 906                        format!(
 907                            "{} for worktree {}",
 908                            error,
 909                            local_worktree.abs_path().to_string_lossy()
 910                        )
 911                    };
 912
 913                    let entry = local_worktree
 914                        .git_entry(project_path.path)
 915                        .with_context(|| worktree_error("No git entry found"))?;
 916
 917                    let repo = local_worktree
 918                        .get_local_repo(&entry)
 919                        .with_context(|| worktree_error("No repository found"))?
 920                        .repo()
 921                        .clone();
 922
 923                    repo.branches()
 924                });
 925
 926                Task::ready(branches)
 927            }
 928            Worktree::Remote(remote_worktree) => {
 929                let request = remote_worktree.client().request(proto::GitBranches {
 930                    project_id: remote_worktree.project_id(),
 931                    repository: Some(proto::ProjectPath {
 932                        worktree_id: project_path.worktree_id.to_proto(),
 933                        path: project_path.path.to_proto(), // Root path
 934                    }),
 935                });
 936
 937                cx.background_executor().spawn(async move {
 938                    let response = request.await?;
 939
 940                    let branches = response
 941                        .branches
 942                        .into_iter()
 943                        .map(|proto_branch| git::repository::Branch {
 944                            is_head: proto_branch.is_head,
 945                            name: proto_branch.name.into(),
 946                            upstream: proto_branch.upstream.map(|upstream| {
 947                                git::repository::Upstream {
 948                                    ref_name: upstream.ref_name.into(),
 949                                    tracking: upstream.tracking.map(|tracking| {
 950                                        git::repository::UpstreamTracking {
 951                                            ahead: tracking.ahead as u32,
 952                                            behind: tracking.behind as u32,
 953                                        }
 954                                    }),
 955                                }
 956                            }),
 957                            most_recent_commit: proto_branch.most_recent_commit.map(|commit| {
 958                                git::repository::CommitSummary {
 959                                    sha: commit.sha.into(),
 960                                    subject: commit.subject.into(),
 961                                    commit_timestamp: commit.commit_timestamp,
 962                                }
 963                            }),
 964                        })
 965                        .collect();
 966
 967                    Ok(branches)
 968                })
 969            }
 970        }
 971    }
 972
 973    pub fn update_or_create_branch(
 974        &self,
 975        repository: ProjectPath,
 976        new_branch: String,
 977        cx: &App,
 978    ) -> Task<Result<()>> {
 979        let Some(worktree) = self.worktree_for_id(repository.worktree_id, cx) else {
 980            return Task::ready(Err(anyhow!("No worktree found for ProjectPath")));
 981        };
 982
 983        match worktree.read(cx) {
 984            Worktree::Local(local_worktree) => {
 985                let result = util::maybe!({
 986                    let worktree_error = |error| {
 987                        format!(
 988                            "{} for worktree {}",
 989                            error,
 990                            local_worktree.abs_path().to_string_lossy()
 991                        )
 992                    };
 993
 994                    let entry = local_worktree
 995                        .git_entry(repository.path)
 996                        .with_context(|| worktree_error("No git entry found"))?;
 997
 998                    let repo = local_worktree
 999                        .get_local_repo(&entry)
1000                        .with_context(|| worktree_error("No repository found"))?
1001                        .repo()
1002                        .clone();
1003
1004                    if !repo.branch_exits(&new_branch)? {
1005                        repo.create_branch(&new_branch)?;
1006                    }
1007
1008                    repo.change_branch(&new_branch)?;
1009                    Ok(())
1010                });
1011
1012                Task::ready(result)
1013            }
1014            Worktree::Remote(remote_worktree) => {
1015                let request = remote_worktree.client().request(proto::UpdateGitBranch {
1016                    project_id: remote_worktree.project_id(),
1017                    repository: Some(proto::ProjectPath {
1018                        worktree_id: repository.worktree_id.to_proto(),
1019                        path: repository.path.to_proto(), // Root path
1020                    }),
1021                    branch_name: new_branch,
1022                });
1023
1024                cx.background_executor().spawn(async move {
1025                    request.await?;
1026                    Ok(())
1027                })
1028            }
1029        }
1030    }
1031
1032    async fn filter_paths(
1033        fs: &Arc<dyn Fs>,
1034        mut input: Receiver<MatchingEntry>,
1035        query: &SearchQuery,
1036    ) -> Result<()> {
1037        let mut input = pin!(input);
1038        while let Some(mut entry) = input.next().await {
1039            let abs_path = entry.worktree_path.join(&entry.path.path);
1040            let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
1041                continue;
1042            };
1043
1044            let mut file = BufReader::new(file);
1045            let file_start = file.fill_buf()?;
1046
1047            if let Err(Some(starting_position)) =
1048                std::str::from_utf8(file_start).map_err(|e| e.error_len())
1049            {
1050                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
1051                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
1052                log::debug!(
1053                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
1054                );
1055                continue;
1056            }
1057
1058            if query.detect(file).unwrap_or(false) {
1059                entry.respond.send(entry.path).await?
1060            }
1061        }
1062
1063        Ok(())
1064    }
1065
1066    pub async fn handle_create_project_entry(
1067        this: Entity<Self>,
1068        envelope: TypedEnvelope<proto::CreateProjectEntry>,
1069        mut cx: AsyncApp,
1070    ) -> Result<proto::ProjectEntryResponse> {
1071        let worktree = this.update(&mut cx, |this, cx| {
1072            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
1073            this.worktree_for_id(worktree_id, cx)
1074                .ok_or_else(|| anyhow!("worktree not found"))
1075        })??;
1076        Worktree::handle_create_entry(worktree, envelope.payload, cx).await
1077    }
1078
1079    pub async fn handle_copy_project_entry(
1080        this: Entity<Self>,
1081        envelope: TypedEnvelope<proto::CopyProjectEntry>,
1082        mut cx: AsyncApp,
1083    ) -> Result<proto::ProjectEntryResponse> {
1084        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1085        let worktree = this.update(&mut cx, |this, cx| {
1086            this.worktree_for_entry(entry_id, cx)
1087                .ok_or_else(|| anyhow!("worktree not found"))
1088        })??;
1089        Worktree::handle_copy_entry(worktree, envelope.payload, cx).await
1090    }
1091
1092    pub async fn handle_delete_project_entry(
1093        this: Entity<Self>,
1094        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
1095        mut cx: AsyncApp,
1096    ) -> Result<proto::ProjectEntryResponse> {
1097        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1098        let worktree = this.update(&mut cx, |this, cx| {
1099            this.worktree_for_entry(entry_id, cx)
1100                .ok_or_else(|| anyhow!("worktree not found"))
1101        })??;
1102        Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
1103    }
1104
1105    pub async fn handle_expand_project_entry(
1106        this: Entity<Self>,
1107        envelope: TypedEnvelope<proto::ExpandProjectEntry>,
1108        mut cx: AsyncApp,
1109    ) -> Result<proto::ExpandProjectEntryResponse> {
1110        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1111        let worktree = this
1112            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1113            .ok_or_else(|| anyhow!("invalid request"))?;
1114        Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
1115    }
1116
1117    pub async fn handle_expand_all_for_project_entry(
1118        this: Entity<Self>,
1119        envelope: TypedEnvelope<proto::ExpandAllForProjectEntry>,
1120        mut cx: AsyncApp,
1121    ) -> Result<proto::ExpandAllForProjectEntryResponse> {
1122        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1123        let worktree = this
1124            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1125            .ok_or_else(|| anyhow!("invalid request"))?;
1126        Worktree::handle_expand_all_for_entry(worktree, envelope.payload, cx).await
1127    }
1128
1129    pub async fn handle_git_branches(
1130        this: Entity<Self>,
1131        branches: TypedEnvelope<proto::GitBranches>,
1132        cx: AsyncApp,
1133    ) -> Result<proto::GitBranchesResponse> {
1134        let project_path = branches
1135            .payload
1136            .repository
1137            .clone()
1138            .context("Invalid GitBranches call")?;
1139        let project_path = ProjectPath {
1140            worktree_id: WorktreeId::from_proto(project_path.worktree_id),
1141            path: Arc::<Path>::from_proto(project_path.path),
1142        };
1143
1144        let branches = this
1145            .read_with(&cx, |this, cx| this.branches(project_path, cx))?
1146            .await?;
1147
1148        Ok(proto::GitBranchesResponse {
1149            branches: branches.iter().map(branch_to_proto).collect(),
1150        })
1151    }
1152
1153    pub async fn handle_update_branch(
1154        this: Entity<Self>,
1155        update_branch: TypedEnvelope<proto::UpdateGitBranch>,
1156        cx: AsyncApp,
1157    ) -> Result<proto::Ack> {
1158        let project_path = update_branch
1159            .payload
1160            .repository
1161            .clone()
1162            .context("Invalid GitBranches call")?;
1163        let project_path = ProjectPath {
1164            worktree_id: WorktreeId::from_proto(project_path.worktree_id),
1165            path: Arc::<Path>::from_proto(project_path.path),
1166        };
1167        let new_branch = update_branch.payload.branch_name;
1168
1169        this.read_with(&cx, |this, cx| {
1170            this.update_or_create_branch(project_path, new_branch, cx)
1171        })?
1172        .await?;
1173
1174        Ok(proto::Ack {})
1175    }
1176}
1177
1178#[derive(Clone, Debug)]
1179enum WorktreeHandle {
1180    Strong(Entity<Worktree>),
1181    Weak(WeakEntity<Worktree>),
1182}
1183
1184impl WorktreeHandle {
1185    fn upgrade(&self) -> Option<Entity<Worktree>> {
1186        match self {
1187            WorktreeHandle::Strong(handle) => Some(handle.clone()),
1188            WorktreeHandle::Weak(handle) => handle.upgrade(),
1189        }
1190    }
1191}