worktree_store.rs

   1use std::{
   2    io::{BufRead, BufReader},
   3    path::{Path, PathBuf},
   4    pin::pin,
   5    sync::{atomic::AtomicUsize, Arc},
   6};
   7
   8use anyhow::{anyhow, Context as _, Result};
   9use collections::{HashMap, HashSet};
  10use fs::Fs;
  11use futures::{
  12    future::{BoxFuture, Shared},
  13    FutureExt, SinkExt,
  14};
  15use gpui::{
  16    App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity,
  17};
  18use postage::oneshot;
  19use rpc::{
  20    proto::{self, FromProto, ToProto, SSH_PROJECT_ID},
  21    AnyProtoClient, ErrorExt, TypedEnvelope,
  22};
  23use smol::{
  24    channel::{Receiver, Sender},
  25    stream::StreamExt,
  26};
  27use text::ReplicaId;
  28use util::{paths::SanitizedPath, ResultExt};
  29use worktree::{Entry, ProjectEntryId, UpdatedEntriesSet, Worktree, WorktreeId, WorktreeSettings};
  30
  31use crate::{search::SearchQuery, ProjectPath};
  32
  33struct MatchingEntry {
  34    worktree_path: Arc<Path>,
  35    path: ProjectPath,
  36    respond: oneshot::Sender<ProjectPath>,
  37}
  38
  39enum WorktreeStoreState {
  40    Local {
  41        fs: Arc<dyn Fs>,
  42    },
  43    Remote {
  44        upstream_client: AnyProtoClient,
  45        upstream_project_id: u64,
  46    },
  47}
  48
  49pub struct WorktreeStore {
  50    next_entry_id: Arc<AtomicUsize>,
  51    downstream_client: Option<(AnyProtoClient, u64)>,
  52    retain_worktrees: bool,
  53    worktrees: Vec<WorktreeHandle>,
  54    worktrees_reordered: bool,
  55    #[allow(clippy::type_complexity)]
  56    loading_worktrees:
  57        HashMap<SanitizedPath, Shared<Task<Result<Entity<Worktree>, Arc<anyhow::Error>>>>>,
  58    state: WorktreeStoreState,
  59}
  60
  61#[derive(Debug)]
  62pub enum WorktreeStoreEvent {
  63    WorktreeAdded(Entity<Worktree>),
  64    WorktreeRemoved(EntityId, WorktreeId),
  65    WorktreeReleased(EntityId, WorktreeId),
  66    WorktreeOrderChanged,
  67    WorktreeUpdateSent(Entity<Worktree>),
  68    WorktreeUpdatedEntries(WorktreeId, UpdatedEntriesSet),
  69    WorktreeUpdatedGitRepositories(WorktreeId),
  70    WorktreeDeletedEntry(WorktreeId, ProjectEntryId),
  71}
  72
  73impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
  74
  75impl WorktreeStore {
  76    pub fn init(client: &AnyProtoClient) {
  77        client.add_entity_request_handler(Self::handle_create_project_entry);
  78        client.add_entity_request_handler(Self::handle_copy_project_entry);
  79        client.add_entity_request_handler(Self::handle_delete_project_entry);
  80        client.add_entity_request_handler(Self::handle_expand_project_entry);
  81        client.add_entity_request_handler(Self::handle_expand_all_for_project_entry);
  82    }
  83
  84    pub fn local(retain_worktrees: bool, fs: Arc<dyn Fs>) -> Self {
  85        Self {
  86            next_entry_id: Default::default(),
  87            loading_worktrees: Default::default(),
  88            downstream_client: None,
  89            worktrees: Vec::new(),
  90            worktrees_reordered: false,
  91            retain_worktrees,
  92            state: WorktreeStoreState::Local { fs },
  93        }
  94    }
  95
  96    pub fn remote(
  97        retain_worktrees: bool,
  98        upstream_client: AnyProtoClient,
  99        upstream_project_id: u64,
 100    ) -> Self {
 101        Self {
 102            next_entry_id: Default::default(),
 103            loading_worktrees: Default::default(),
 104            downstream_client: None,
 105            worktrees: Vec::new(),
 106            worktrees_reordered: false,
 107            retain_worktrees,
 108            state: WorktreeStoreState::Remote {
 109                upstream_client,
 110                upstream_project_id,
 111            },
 112        }
 113    }
 114
 115    /// Iterates through all worktrees, including ones that don't appear in the project panel
 116    pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Entity<Worktree>> {
 117        self.worktrees
 118            .iter()
 119            .filter_map(move |worktree| worktree.upgrade())
 120    }
 121
 122    /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
 123    pub fn visible_worktrees<'a>(
 124        &'a self,
 125        cx: &'a App,
 126    ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
 127        self.worktrees()
 128            .filter(|worktree| worktree.read(cx).is_visible())
 129    }
 130
 131    pub fn worktree_for_id(&self, id: WorktreeId, cx: &App) -> Option<Entity<Worktree>> {
 132        self.worktrees()
 133            .find(|worktree| worktree.read(cx).id() == id)
 134    }
 135
 136    pub fn worktree_for_entry(
 137        &self,
 138        entry_id: ProjectEntryId,
 139        cx: &App,
 140    ) -> Option<Entity<Worktree>> {
 141        self.worktrees()
 142            .find(|worktree| worktree.read(cx).contains_entry(entry_id))
 143    }
 144
 145    pub fn find_worktree(
 146        &self,
 147        abs_path: impl Into<SanitizedPath>,
 148        cx: &App,
 149    ) -> Option<(Entity<Worktree>, PathBuf)> {
 150        let abs_path: SanitizedPath = abs_path.into();
 151        for tree in self.worktrees() {
 152            if let Ok(relative_path) = abs_path.as_path().strip_prefix(tree.read(cx).abs_path()) {
 153                return Some((tree.clone(), relative_path.into()));
 154            }
 155        }
 156        None
 157    }
 158
 159    pub fn find_or_create_worktree(
 160        &mut self,
 161        abs_path: impl AsRef<Path>,
 162        visible: bool,
 163        cx: &mut Context<Self>,
 164    ) -> Task<Result<(Entity<Worktree>, PathBuf)>> {
 165        let abs_path = abs_path.as_ref();
 166        if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
 167            Task::ready(Ok((tree, relative_path)))
 168        } else {
 169            let worktree = self.create_worktree(abs_path, visible, cx);
 170            cx.background_spawn(async move { Ok((worktree.await?, PathBuf::new())) })
 171        }
 172    }
 173
 174    pub fn entry_for_id<'a>(&'a self, entry_id: ProjectEntryId, cx: &'a App) -> Option<&'a Entry> {
 175        self.worktrees()
 176            .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
 177    }
 178
 179    pub fn worktree_and_entry_for_id<'a>(
 180        &'a self,
 181        entry_id: ProjectEntryId,
 182        cx: &'a App,
 183    ) -> Option<(Entity<Worktree>, &'a Entry)> {
 184        self.worktrees().find_map(|worktree| {
 185            worktree
 186                .read(cx)
 187                .entry_for_id(entry_id)
 188                .map(|e| (worktree.clone(), e))
 189        })
 190    }
 191
 192    pub fn entry_for_path(&self, path: &ProjectPath, cx: &App) -> Option<Entry> {
 193        self.worktree_for_id(path.worktree_id, cx)?
 194            .read(cx)
 195            .entry_for_path(&path.path)
 196            .cloned()
 197    }
 198
 199    pub fn create_worktree(
 200        &mut self,
 201        abs_path: impl Into<SanitizedPath>,
 202        visible: bool,
 203        cx: &mut Context<Self>,
 204    ) -> Task<Result<Entity<Worktree>>> {
 205        let abs_path: SanitizedPath = abs_path.into();
 206        if !self.loading_worktrees.contains_key(&abs_path) {
 207            let task = match &self.state {
 208                WorktreeStoreState::Remote {
 209                    upstream_client, ..
 210                } => {
 211                    if upstream_client.is_via_collab() {
 212                        Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
 213                    } else {
 214                        self.create_ssh_worktree(
 215                            upstream_client.clone(),
 216                            abs_path.clone(),
 217                            visible,
 218                            cx,
 219                        )
 220                    }
 221                }
 222                WorktreeStoreState::Local { fs } => {
 223                    self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
 224                }
 225            };
 226
 227            self.loading_worktrees
 228                .insert(abs_path.clone(), task.shared());
 229        }
 230        let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
 231        cx.spawn(async move |this, cx| {
 232            let result = task.await;
 233            this.update(cx, |this, _| this.loading_worktrees.remove(&abs_path))
 234                .ok();
 235            match result {
 236                Ok(worktree) => Ok(worktree),
 237                Err(err) => Err((*err).cloned()),
 238            }
 239        })
 240    }
 241
 242    fn create_ssh_worktree(
 243        &mut self,
 244        client: AnyProtoClient,
 245        abs_path: impl Into<SanitizedPath>,
 246        visible: bool,
 247        cx: &mut Context<Self>,
 248    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
 249        let mut abs_path = Into::<SanitizedPath>::into(abs_path).to_string();
 250        // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
 251        // in which case want to strip the leading the `/`.
 252        // On the host-side, the `~` will get expanded.
 253        // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
 254        if abs_path.starts_with("/~") {
 255            abs_path = abs_path[1..].to_string();
 256        }
 257        if abs_path.is_empty() || abs_path == "/" {
 258            abs_path = "~/".to_string();
 259        }
 260        cx.spawn(async move |this, cx| {
 261            let this = this.upgrade().context("Dropped worktree store")?;
 262
 263            let path = Path::new(abs_path.as_str());
 264            let response = client
 265                .request(proto::AddWorktree {
 266                    project_id: SSH_PROJECT_ID,
 267                    path: path.to_proto(),
 268                    visible,
 269                })
 270                .await?;
 271
 272            if let Some(existing_worktree) = this.read_with(cx, |this, cx| {
 273                this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
 274            })? {
 275                return Ok(existing_worktree);
 276            }
 277
 278            let root_path_buf = PathBuf::from_proto(response.canonicalized_path.clone());
 279            let root_name = root_path_buf
 280                .file_name()
 281                .map(|n| n.to_string_lossy().to_string())
 282                .unwrap_or(root_path_buf.to_string_lossy().to_string());
 283
 284            let worktree = cx.update(|cx| {
 285                Worktree::remote(
 286                    SSH_PROJECT_ID,
 287                    0,
 288                    proto::WorktreeMetadata {
 289                        id: response.worktree_id,
 290                        root_name,
 291                        visible,
 292                        abs_path: response.canonicalized_path,
 293                    },
 294                    client,
 295                    cx,
 296                )
 297            })?;
 298
 299            this.update(cx, |this, cx| {
 300                this.add(&worktree, cx);
 301            })?;
 302            Ok(worktree)
 303        })
 304    }
 305
 306    fn create_local_worktree(
 307        &mut self,
 308        fs: Arc<dyn Fs>,
 309        abs_path: impl Into<SanitizedPath>,
 310        visible: bool,
 311        cx: &mut Context<Self>,
 312    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
 313        let next_entry_id = self.next_entry_id.clone();
 314        let path: SanitizedPath = abs_path.into();
 315
 316        cx.spawn(async move |this, cx| {
 317            let worktree = Worktree::local(path.clone(), visible, fs, next_entry_id, cx).await;
 318
 319            let worktree = worktree?;
 320
 321            this.update(cx, |this, cx| this.add(&worktree, cx))?;
 322
 323            if visible {
 324                cx.update(|cx| {
 325                    cx.add_recent_document(path.as_path());
 326                })
 327                .log_err();
 328            }
 329
 330            Ok(worktree)
 331        })
 332    }
 333
 334    pub fn add(&mut self, worktree: &Entity<Worktree>, cx: &mut Context<Self>) {
 335        let worktree_id = worktree.read(cx).id();
 336        debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
 337
 338        let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
 339        let handle = if push_strong_handle {
 340            WorktreeHandle::Strong(worktree.clone())
 341        } else {
 342            WorktreeHandle::Weak(worktree.downgrade())
 343        };
 344        if self.worktrees_reordered {
 345            self.worktrees.push(handle);
 346        } else {
 347            let i = match self
 348                .worktrees
 349                .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
 350                    other.upgrade().map(|worktree| worktree.read(cx).abs_path())
 351                }) {
 352                Ok(i) | Err(i) => i,
 353            };
 354            self.worktrees.insert(i, handle);
 355        }
 356
 357        cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
 358        self.send_project_updates(cx);
 359
 360        let handle_id = worktree.entity_id();
 361        cx.subscribe(worktree, |_, worktree, event, cx| {
 362            let worktree_id = worktree.update(cx, |worktree, _| worktree.id());
 363            match event {
 364                worktree::Event::UpdatedEntries(changes) => {
 365                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedEntries(
 366                        worktree_id,
 367                        changes.clone(),
 368                    ));
 369                }
 370                worktree::Event::UpdatedGitRepositories(_) => {
 371                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedGitRepositories(
 372                        worktree_id,
 373                    ));
 374                }
 375                worktree::Event::DeletedEntry(id) => {
 376                    cx.emit(WorktreeStoreEvent::WorktreeDeletedEntry(worktree_id, *id))
 377                }
 378            }
 379        })
 380        .detach();
 381        cx.observe_release(worktree, move |this, worktree, cx| {
 382            cx.emit(WorktreeStoreEvent::WorktreeReleased(
 383                handle_id,
 384                worktree.id(),
 385            ));
 386            cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 387                handle_id,
 388                worktree.id(),
 389            ));
 390            this.send_project_updates(cx);
 391        })
 392        .detach();
 393    }
 394
 395    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut Context<Self>) {
 396        self.worktrees.retain(|worktree| {
 397            if let Some(worktree) = worktree.upgrade() {
 398                if worktree.read(cx).id() == id_to_remove {
 399                    cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 400                        worktree.entity_id(),
 401                        id_to_remove,
 402                    ));
 403                    false
 404                } else {
 405                    true
 406                }
 407            } else {
 408                false
 409            }
 410        });
 411        self.send_project_updates(cx);
 412    }
 413
 414    pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
 415        self.worktrees_reordered = worktrees_reordered;
 416    }
 417
 418    fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
 419        match &self.state {
 420            WorktreeStoreState::Remote {
 421                upstream_client,
 422                upstream_project_id,
 423                ..
 424            } => Some((upstream_client.clone(), *upstream_project_id)),
 425            WorktreeStoreState::Local { .. } => None,
 426        }
 427    }
 428
 429    pub fn set_worktrees_from_proto(
 430        &mut self,
 431        worktrees: Vec<proto::WorktreeMetadata>,
 432        replica_id: ReplicaId,
 433        cx: &mut Context<Self>,
 434    ) -> Result<()> {
 435        let mut old_worktrees_by_id = self
 436            .worktrees
 437            .drain(..)
 438            .filter_map(|worktree| {
 439                let worktree = worktree.upgrade()?;
 440                Some((worktree.read(cx).id(), worktree))
 441            })
 442            .collect::<HashMap<_, _>>();
 443
 444        let (client, project_id) = self
 445            .upstream_client()
 446            .clone()
 447            .ok_or_else(|| anyhow!("invalid project"))?;
 448
 449        for worktree in worktrees {
 450            if let Some(old_worktree) =
 451                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
 452            {
 453                let push_strong_handle =
 454                    self.retain_worktrees || old_worktree.read(cx).is_visible();
 455                let handle = if push_strong_handle {
 456                    WorktreeHandle::Strong(old_worktree.clone())
 457                } else {
 458                    WorktreeHandle::Weak(old_worktree.downgrade())
 459                };
 460                self.worktrees.push(handle);
 461            } else {
 462                self.add(
 463                    &Worktree::remote(project_id, replica_id, worktree, client.clone(), cx),
 464                    cx,
 465                );
 466            }
 467        }
 468        self.send_project_updates(cx);
 469
 470        Ok(())
 471    }
 472
 473    pub fn move_worktree(
 474        &mut self,
 475        source: WorktreeId,
 476        destination: WorktreeId,
 477        cx: &mut Context<Self>,
 478    ) -> Result<()> {
 479        if source == destination {
 480            return Ok(());
 481        }
 482
 483        let mut source_index = None;
 484        let mut destination_index = None;
 485        for (i, worktree) in self.worktrees.iter().enumerate() {
 486            if let Some(worktree) = worktree.upgrade() {
 487                let worktree_id = worktree.read(cx).id();
 488                if worktree_id == source {
 489                    source_index = Some(i);
 490                    if destination_index.is_some() {
 491                        break;
 492                    }
 493                } else if worktree_id == destination {
 494                    destination_index = Some(i);
 495                    if source_index.is_some() {
 496                        break;
 497                    }
 498                }
 499            }
 500        }
 501
 502        let source_index =
 503            source_index.with_context(|| format!("Missing worktree for id {source}"))?;
 504        let destination_index =
 505            destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
 506
 507        if source_index == destination_index {
 508            return Ok(());
 509        }
 510
 511        let worktree_to_move = self.worktrees.remove(source_index);
 512        self.worktrees.insert(destination_index, worktree_to_move);
 513        self.worktrees_reordered = true;
 514        cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
 515        cx.notify();
 516        Ok(())
 517    }
 518
 519    pub fn disconnected_from_host(&mut self, cx: &mut App) {
 520        for worktree in &self.worktrees {
 521            if let Some(worktree) = worktree.upgrade() {
 522                worktree.update(cx, |worktree, _| {
 523                    if let Some(worktree) = worktree.as_remote_mut() {
 524                        worktree.disconnected_from_host();
 525                    }
 526                });
 527            }
 528        }
 529    }
 530
 531    pub fn send_project_updates(&mut self, cx: &mut Context<Self>) {
 532        let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
 533            return;
 534        };
 535
 536        let update = proto::UpdateProject {
 537            project_id,
 538            worktrees: self.worktree_metadata_protos(cx),
 539        };
 540
 541        // collab has bad concurrency guarantees, so we send requests in serial.
 542        let update_project = if downstream_client.is_via_collab() {
 543            Some(downstream_client.request(update))
 544        } else {
 545            downstream_client.send(update).log_err();
 546            None
 547        };
 548        cx.spawn(async move |this, cx| {
 549            if let Some(update_project) = update_project {
 550                update_project.await?;
 551            }
 552
 553            this.update(cx, |this, cx| {
 554                let worktrees = this.worktrees().collect::<Vec<_>>();
 555
 556                for worktree in worktrees {
 557                    worktree.update(cx, |worktree, cx| {
 558                        let client = downstream_client.clone();
 559                        worktree.observe_updates(project_id, cx, {
 560                            move |update| {
 561                                let client = client.clone();
 562                                async move {
 563                                    if client.is_via_collab() {
 564                                        match update {
 565                                            proto::WorktreeRelatedMessage::UpdateWorktree(
 566                                                update,
 567                                            ) => {
 568                                                client
 569                                                    .request(update)
 570                                                    .map(|result| result.log_err().is_some())
 571                                                    .await
 572                                            }
 573                                            proto::WorktreeRelatedMessage::UpdateRepository(
 574                                                update,
 575                                            ) => {
 576                                                client
 577                                                    .request(update)
 578                                                    .map(|result| result.log_err().is_some())
 579                                                    .await
 580                                            }
 581                                            proto::WorktreeRelatedMessage::RemoveRepository(
 582                                                update,
 583                                            ) => {
 584                                                client
 585                                                    .request(update)
 586                                                    .map(|result| result.log_err().is_some())
 587                                                    .await
 588                                            }
 589                                        }
 590                                    } else {
 591                                        match update {
 592                                            proto::WorktreeRelatedMessage::UpdateWorktree(
 593                                                update,
 594                                            ) => client.send(update).log_err().is_some(),
 595                                            proto::WorktreeRelatedMessage::UpdateRepository(
 596                                                update,
 597                                            ) => client.send(update).log_err().is_some(),
 598                                            proto::WorktreeRelatedMessage::RemoveRepository(
 599                                                update,
 600                                            ) => client.send(update).log_err().is_some(),
 601                                        }
 602                                    }
 603                                }
 604                            }
 605                        });
 606                    });
 607
 608                    cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
 609                }
 610
 611                anyhow::Ok(())
 612            })
 613        })
 614        .detach_and_log_err(cx);
 615    }
 616
 617    pub fn worktree_metadata_protos(&self, cx: &App) -> Vec<proto::WorktreeMetadata> {
 618        self.worktrees()
 619            .map(|worktree| {
 620                let worktree = worktree.read(cx);
 621                proto::WorktreeMetadata {
 622                    id: worktree.id().to_proto(),
 623                    root_name: worktree.root_name().into(),
 624                    visible: worktree.is_visible(),
 625                    abs_path: worktree.abs_path().to_proto(),
 626                }
 627            })
 628            .collect()
 629    }
 630
 631    pub fn shared(
 632        &mut self,
 633        remote_id: u64,
 634        downstream_client: AnyProtoClient,
 635        cx: &mut Context<Self>,
 636    ) {
 637        self.retain_worktrees = true;
 638        self.downstream_client = Some((downstream_client, remote_id));
 639
 640        // When shared, retain all worktrees
 641        for worktree_handle in self.worktrees.iter_mut() {
 642            match worktree_handle {
 643                WorktreeHandle::Strong(_) => {}
 644                WorktreeHandle::Weak(worktree) => {
 645                    if let Some(worktree) = worktree.upgrade() {
 646                        *worktree_handle = WorktreeHandle::Strong(worktree);
 647                    }
 648                }
 649            }
 650        }
 651        self.send_project_updates(cx);
 652    }
 653
 654    pub fn unshared(&mut self, cx: &mut Context<Self>) {
 655        self.retain_worktrees = false;
 656        self.downstream_client.take();
 657
 658        // When not shared, only retain the visible worktrees
 659        for worktree_handle in self.worktrees.iter_mut() {
 660            if let WorktreeHandle::Strong(worktree) = worktree_handle {
 661                let is_visible = worktree.update(cx, |worktree, _| {
 662                    worktree.stop_observing_updates();
 663                    worktree.is_visible()
 664                });
 665                if !is_visible {
 666                    *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
 667                }
 668            }
 669        }
 670    }
 671
 672    /// search over all worktrees and return buffers that *might* match the search.
 673    pub fn find_search_candidates(
 674        &self,
 675        query: SearchQuery,
 676        limit: usize,
 677        open_entries: HashSet<ProjectEntryId>,
 678        fs: Arc<dyn Fs>,
 679        cx: &Context<Self>,
 680    ) -> Receiver<ProjectPath> {
 681        let snapshots = self
 682            .visible_worktrees(cx)
 683            .filter_map(|tree| {
 684                let tree = tree.read(cx);
 685                Some((tree.snapshot(), tree.as_local()?.settings()))
 686            })
 687            .collect::<Vec<_>>();
 688
 689        let executor = cx.background_executor().clone();
 690
 691        // We want to return entries in the order they are in the worktrees, so we have one
 692        // thread that iterates over the worktrees (and ignored directories) as necessary,
 693        // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
 694        // channel.
 695        // We spawn a number of workers that take items from the filter channel and check the query
 696        // against the version of the file on disk.
 697        let (filter_tx, filter_rx) = smol::channel::bounded(64);
 698        let (output_tx, output_rx) = smol::channel::bounded(64);
 699        let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
 700
 701        let input = cx.background_spawn({
 702            let fs = fs.clone();
 703            let query = query.clone();
 704            async move {
 705                Self::find_candidate_paths(
 706                    fs,
 707                    snapshots,
 708                    open_entries,
 709                    query,
 710                    filter_tx,
 711                    output_tx,
 712                )
 713                .await
 714                .log_err();
 715            }
 716        });
 717        const MAX_CONCURRENT_FILE_SCANS: usize = 64;
 718        let filters = cx.background_spawn(async move {
 719            let fs = &fs;
 720            let query = &query;
 721            executor
 722                .scoped(move |scope| {
 723                    for _ in 0..MAX_CONCURRENT_FILE_SCANS {
 724                        let filter_rx = filter_rx.clone();
 725                        scope.spawn(async move {
 726                            Self::filter_paths(fs, filter_rx, query)
 727                                .await
 728                                .log_with_level(log::Level::Debug);
 729                        })
 730                    }
 731                })
 732                .await;
 733        });
 734        cx.background_spawn(async move {
 735            let mut matched = 0;
 736            while let Ok(mut receiver) = output_rx.recv().await {
 737                let Some(path) = receiver.next().await else {
 738                    continue;
 739                };
 740                let Ok(_) = matching_paths_tx.send(path).await else {
 741                    break;
 742                };
 743                matched += 1;
 744                if matched == limit {
 745                    break;
 746                }
 747            }
 748            drop(input);
 749            drop(filters);
 750        })
 751        .detach();
 752        matching_paths_rx
 753    }
 754
 755    fn scan_ignored_dir<'a>(
 756        fs: &'a Arc<dyn Fs>,
 757        snapshot: &'a worktree::Snapshot,
 758        path: &'a Path,
 759        query: &'a SearchQuery,
 760        include_root: bool,
 761        filter_tx: &'a Sender<MatchingEntry>,
 762        output_tx: &'a Sender<oneshot::Receiver<ProjectPath>>,
 763    ) -> BoxFuture<'a, Result<()>> {
 764        async move {
 765            let abs_path = snapshot.abs_path().join(path);
 766            let Some(mut files) = fs
 767                .read_dir(&abs_path)
 768                .await
 769                .with_context(|| format!("listing ignored path {abs_path:?}"))
 770                .log_err()
 771            else {
 772                return Ok(());
 773            };
 774
 775            let mut results = Vec::new();
 776
 777            while let Some(Ok(file)) = files.next().await {
 778                let Some(metadata) = fs
 779                    .metadata(&file)
 780                    .await
 781                    .with_context(|| format!("fetching fs metadata for {abs_path:?}"))
 782                    .log_err()
 783                    .flatten()
 784                else {
 785                    continue;
 786                };
 787                if metadata.is_symlink || metadata.is_fifo {
 788                    continue;
 789                }
 790                results.push((
 791                    file.strip_prefix(snapshot.abs_path())?.to_path_buf(),
 792                    !metadata.is_dir,
 793                ))
 794            }
 795            results.sort_by(|(a_path, _), (b_path, _)| a_path.cmp(b_path));
 796            for (path, is_file) in results {
 797                if is_file {
 798                    if query.filters_path() {
 799                        let matched_path = if include_root {
 800                            let mut full_path = PathBuf::from(snapshot.root_name());
 801                            full_path.push(&path);
 802                            query.file_matches(&full_path)
 803                        } else {
 804                            query.file_matches(&path)
 805                        };
 806                        if !matched_path {
 807                            continue;
 808                        }
 809                    }
 810                    let (tx, rx) = oneshot::channel();
 811                    output_tx.send(rx).await?;
 812                    filter_tx
 813                        .send(MatchingEntry {
 814                            respond: tx,
 815                            worktree_path: snapshot.abs_path().clone(),
 816                            path: ProjectPath {
 817                                worktree_id: snapshot.id(),
 818                                path: Arc::from(path),
 819                            },
 820                        })
 821                        .await?;
 822                } else {
 823                    Self::scan_ignored_dir(
 824                        fs,
 825                        snapshot,
 826                        &path,
 827                        query,
 828                        include_root,
 829                        filter_tx,
 830                        output_tx,
 831                    )
 832                    .await?;
 833                }
 834            }
 835            Ok(())
 836        }
 837        .boxed()
 838    }
 839
 840    async fn find_candidate_paths(
 841        fs: Arc<dyn Fs>,
 842        snapshots: Vec<(worktree::Snapshot, WorktreeSettings)>,
 843        open_entries: HashSet<ProjectEntryId>,
 844        query: SearchQuery,
 845        filter_tx: Sender<MatchingEntry>,
 846        output_tx: Sender<oneshot::Receiver<ProjectPath>>,
 847    ) -> Result<()> {
 848        let include_root = snapshots.len() > 1;
 849        for (snapshot, settings) in snapshots {
 850            for entry in snapshot.entries(query.include_ignored(), 0) {
 851                if entry.is_dir() && entry.is_ignored {
 852                    if !settings.is_path_excluded(&entry.path) {
 853                        Self::scan_ignored_dir(
 854                            &fs,
 855                            &snapshot,
 856                            &entry.path,
 857                            &query,
 858                            include_root,
 859                            &filter_tx,
 860                            &output_tx,
 861                        )
 862                        .await?;
 863                    }
 864                    continue;
 865                }
 866
 867                if entry.is_fifo || !entry.is_file() {
 868                    continue;
 869                }
 870
 871                if query.filters_path() {
 872                    let matched_path = if include_root {
 873                        let mut full_path = PathBuf::from(snapshot.root_name());
 874                        full_path.push(&entry.path);
 875                        query.file_matches(&full_path)
 876                    } else {
 877                        query.file_matches(&entry.path)
 878                    };
 879                    if !matched_path {
 880                        continue;
 881                    }
 882                }
 883
 884                let (mut tx, rx) = oneshot::channel();
 885
 886                if open_entries.contains(&entry.id) {
 887                    tx.send(ProjectPath {
 888                        worktree_id: snapshot.id(),
 889                        path: entry.path.clone(),
 890                    })
 891                    .await?;
 892                } else {
 893                    filter_tx
 894                        .send(MatchingEntry {
 895                            respond: tx,
 896                            worktree_path: snapshot.abs_path().clone(),
 897                            path: ProjectPath {
 898                                worktree_id: snapshot.id(),
 899                                path: entry.path.clone(),
 900                            },
 901                        })
 902                        .await?;
 903                }
 904
 905                output_tx.send(rx).await?;
 906            }
 907        }
 908        Ok(())
 909    }
 910
 911    async fn filter_paths(
 912        fs: &Arc<dyn Fs>,
 913        mut input: Receiver<MatchingEntry>,
 914        query: &SearchQuery,
 915    ) -> Result<()> {
 916        let mut input = pin!(input);
 917        while let Some(mut entry) = input.next().await {
 918            let abs_path = entry.worktree_path.join(&entry.path.path);
 919            let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
 920                continue;
 921            };
 922
 923            let mut file = BufReader::new(file);
 924            let file_start = file.fill_buf()?;
 925
 926            if let Err(Some(starting_position)) =
 927                std::str::from_utf8(file_start).map_err(|e| e.error_len())
 928            {
 929                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
 930                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
 931                log::debug!(
 932                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
 933                );
 934                continue;
 935            }
 936
 937            if query.detect(file).unwrap_or(false) {
 938                entry.respond.send(entry.path).await?
 939            }
 940        }
 941
 942        Ok(())
 943    }
 944
 945    pub async fn handle_create_project_entry(
 946        this: Entity<Self>,
 947        envelope: TypedEnvelope<proto::CreateProjectEntry>,
 948        mut cx: AsyncApp,
 949    ) -> Result<proto::ProjectEntryResponse> {
 950        let worktree = this.update(&mut cx, |this, cx| {
 951            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
 952            this.worktree_for_id(worktree_id, cx)
 953                .ok_or_else(|| anyhow!("worktree not found"))
 954        })??;
 955        Worktree::handle_create_entry(worktree, envelope.payload, cx).await
 956    }
 957
 958    pub async fn handle_copy_project_entry(
 959        this: Entity<Self>,
 960        envelope: TypedEnvelope<proto::CopyProjectEntry>,
 961        mut cx: AsyncApp,
 962    ) -> Result<proto::ProjectEntryResponse> {
 963        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
 964        let worktree = this.update(&mut cx, |this, cx| {
 965            this.worktree_for_entry(entry_id, cx)
 966                .ok_or_else(|| anyhow!("worktree not found"))
 967        })??;
 968        Worktree::handle_copy_entry(worktree, envelope.payload, cx).await
 969    }
 970
 971    pub async fn handle_delete_project_entry(
 972        this: Entity<Self>,
 973        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
 974        mut cx: AsyncApp,
 975    ) -> Result<proto::ProjectEntryResponse> {
 976        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
 977        let worktree = this.update(&mut cx, |this, cx| {
 978            this.worktree_for_entry(entry_id, cx)
 979                .ok_or_else(|| anyhow!("worktree not found"))
 980        })??;
 981        Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
 982    }
 983
 984    pub async fn handle_expand_project_entry(
 985        this: Entity<Self>,
 986        envelope: TypedEnvelope<proto::ExpandProjectEntry>,
 987        mut cx: AsyncApp,
 988    ) -> Result<proto::ExpandProjectEntryResponse> {
 989        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
 990        let worktree = this
 991            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
 992            .ok_or_else(|| anyhow!("invalid request"))?;
 993        Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
 994    }
 995
 996    pub async fn handle_expand_all_for_project_entry(
 997        this: Entity<Self>,
 998        envelope: TypedEnvelope<proto::ExpandAllForProjectEntry>,
 999        mut cx: AsyncApp,
1000    ) -> Result<proto::ExpandAllForProjectEntryResponse> {
1001        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1002        let worktree = this
1003            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1004            .ok_or_else(|| anyhow!("invalid request"))?;
1005        Worktree::handle_expand_all_for_entry(worktree, envelope.payload, cx).await
1006    }
1007}
1008
1009#[derive(Clone, Debug)]
1010enum WorktreeHandle {
1011    Strong(Entity<Worktree>),
1012    Weak(WeakEntity<Worktree>),
1013}
1014
1015impl WorktreeHandle {
1016    fn upgrade(&self) -> Option<Entity<Worktree>> {
1017        match self {
1018            WorktreeHandle::Strong(handle) => Some(handle.clone()),
1019            WorktreeHandle::Weak(handle) => handle.upgrade(),
1020        }
1021    }
1022}