worktree_store.rs

   1use std::{
   2    io::{BufRead, BufReader},
   3    path::{Path, PathBuf},
   4    pin::pin,
   5    sync::{Arc, atomic::AtomicUsize},
   6};
   7
   8use anyhow::{Context as _, Result, anyhow};
   9use collections::{HashMap, HashSet};
  10use fs::Fs;
  11use futures::{
  12    FutureExt, SinkExt,
  13    future::{BoxFuture, Shared},
  14};
  15use gpui::{
  16    App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity,
  17};
  18use postage::oneshot;
  19use rpc::{
  20    AnyProtoClient, ErrorExt, TypedEnvelope,
  21    proto::{self, FromProto, REMOTE_SERVER_PROJECT_ID, ToProto},
  22};
  23use smol::{
  24    channel::{Receiver, Sender},
  25    stream::StreamExt,
  26};
  27use text::ReplicaId;
  28use util::{
  29    ResultExt,
  30    paths::{PathStyle, RemotePathBuf, SanitizedPath},
  31};
  32use worktree::{
  33    Entry, ProjectEntryId, UpdatedEntriesSet, UpdatedGitRepositoriesSet, Worktree, WorktreeId,
  34    WorktreeSettings,
  35};
  36
  37use crate::{ProjectPath, search::SearchQuery};
  38
  39struct MatchingEntry {
  40    worktree_path: Arc<Path>,
  41    path: ProjectPath,
  42    respond: oneshot::Sender<ProjectPath>,
  43}
  44
  45enum WorktreeStoreState {
  46    Local {
  47        fs: Arc<dyn Fs>,
  48    },
  49    Remote {
  50        upstream_client: AnyProtoClient,
  51        upstream_project_id: u64,
  52        path_style: PathStyle,
  53    },
  54}
  55
  56pub struct WorktreeStore {
  57    next_entry_id: Arc<AtomicUsize>,
  58    downstream_client: Option<(AnyProtoClient, u64)>,
  59    retain_worktrees: bool,
  60    worktrees: Vec<WorktreeHandle>,
  61    worktrees_reordered: bool,
  62    #[allow(clippy::type_complexity)]
  63    loading_worktrees:
  64        HashMap<Arc<SanitizedPath>, Shared<Task<Result<Entity<Worktree>, Arc<anyhow::Error>>>>>,
  65    state: WorktreeStoreState,
  66}
  67
  68#[derive(Debug)]
  69pub enum WorktreeStoreEvent {
  70    WorktreeAdded(Entity<Worktree>),
  71    WorktreeRemoved(EntityId, WorktreeId),
  72    WorktreeReleased(EntityId, WorktreeId),
  73    WorktreeOrderChanged,
  74    WorktreeUpdateSent(Entity<Worktree>),
  75    WorktreeUpdatedEntries(WorktreeId, UpdatedEntriesSet),
  76    WorktreeUpdatedGitRepositories(WorktreeId, UpdatedGitRepositoriesSet),
  77    WorktreeDeletedEntry(WorktreeId, ProjectEntryId),
  78}
  79
  80impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
  81
  82impl WorktreeStore {
  83    pub fn init(client: &AnyProtoClient) {
  84        client.add_entity_request_handler(Self::handle_create_project_entry);
  85        client.add_entity_request_handler(Self::handle_copy_project_entry);
  86        client.add_entity_request_handler(Self::handle_delete_project_entry);
  87        client.add_entity_request_handler(Self::handle_expand_project_entry);
  88        client.add_entity_request_handler(Self::handle_expand_all_for_project_entry);
  89    }
  90
  91    pub fn local(retain_worktrees: bool, fs: Arc<dyn Fs>) -> Self {
  92        Self {
  93            next_entry_id: Default::default(),
  94            loading_worktrees: Default::default(),
  95            downstream_client: None,
  96            worktrees: Vec::new(),
  97            worktrees_reordered: false,
  98            retain_worktrees,
  99            state: WorktreeStoreState::Local { fs },
 100        }
 101    }
 102
 103    pub fn remote(
 104        retain_worktrees: bool,
 105        upstream_client: AnyProtoClient,
 106        upstream_project_id: u64,
 107        path_style: PathStyle,
 108    ) -> Self {
 109        Self {
 110            next_entry_id: Default::default(),
 111            loading_worktrees: Default::default(),
 112            downstream_client: None,
 113            worktrees: Vec::new(),
 114            worktrees_reordered: false,
 115            retain_worktrees,
 116            state: WorktreeStoreState::Remote {
 117                upstream_client,
 118                upstream_project_id,
 119                path_style,
 120            },
 121        }
 122    }
 123
 124    /// Iterates through all worktrees, including ones that don't appear in the project panel
 125    pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Entity<Worktree>> {
 126        self.worktrees
 127            .iter()
 128            .filter_map(move |worktree| worktree.upgrade())
 129    }
 130
 131    /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
 132    pub fn visible_worktrees<'a>(
 133        &'a self,
 134        cx: &'a App,
 135    ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
 136        self.worktrees()
 137            .filter(|worktree| worktree.read(cx).is_visible())
 138    }
 139
 140    pub fn worktree_for_id(&self, id: WorktreeId, cx: &App) -> Option<Entity<Worktree>> {
 141        self.worktrees()
 142            .find(|worktree| worktree.read(cx).id() == id)
 143    }
 144
 145    pub fn worktree_for_entry(
 146        &self,
 147        entry_id: ProjectEntryId,
 148        cx: &App,
 149    ) -> Option<Entity<Worktree>> {
 150        self.worktrees()
 151            .find(|worktree| worktree.read(cx).contains_entry(entry_id))
 152    }
 153
 154    pub fn find_worktree(
 155        &self,
 156        abs_path: impl AsRef<Path>,
 157        cx: &App,
 158    ) -> Option<(Entity<Worktree>, PathBuf)> {
 159        let abs_path = SanitizedPath::new(&abs_path);
 160        for tree in self.worktrees() {
 161            if let Ok(relative_path) = abs_path.as_path().strip_prefix(tree.read(cx).abs_path()) {
 162                return Some((tree.clone(), relative_path.into()));
 163            }
 164        }
 165        None
 166    }
 167
 168    pub fn absolutize(&self, project_path: &ProjectPath, cx: &App) -> Option<PathBuf> {
 169        let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
 170        worktree.read(cx).absolutize(&project_path.path).ok()
 171    }
 172
 173    pub fn find_or_create_worktree(
 174        &mut self,
 175        abs_path: impl AsRef<Path>,
 176        visible: bool,
 177        cx: &mut Context<Self>,
 178    ) -> Task<Result<(Entity<Worktree>, PathBuf)>> {
 179        let abs_path = abs_path.as_ref();
 180        if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
 181            Task::ready(Ok((tree, relative_path)))
 182        } else {
 183            let worktree = self.create_worktree(abs_path, visible, cx);
 184            cx.background_spawn(async move { Ok((worktree.await?, PathBuf::new())) })
 185        }
 186    }
 187
 188    pub fn entry_for_id<'a>(&'a self, entry_id: ProjectEntryId, cx: &'a App) -> Option<&'a Entry> {
 189        self.worktrees()
 190            .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
 191    }
 192
 193    pub fn worktree_and_entry_for_id<'a>(
 194        &'a self,
 195        entry_id: ProjectEntryId,
 196        cx: &'a App,
 197    ) -> Option<(Entity<Worktree>, &'a Entry)> {
 198        self.worktrees().find_map(|worktree| {
 199            worktree
 200                .read(cx)
 201                .entry_for_id(entry_id)
 202                .map(|e| (worktree.clone(), e))
 203        })
 204    }
 205
 206    pub fn entry_for_path<'a>(&'a self, path: &ProjectPath, cx: &'a App) -> Option<&'a Entry> {
 207        self.worktree_for_id(path.worktree_id, cx)?
 208            .read(cx)
 209            .entry_for_path(&path.path)
 210    }
 211
 212    pub fn create_worktree(
 213        &mut self,
 214        abs_path: impl AsRef<Path>,
 215        visible: bool,
 216        cx: &mut Context<Self>,
 217    ) -> Task<Result<Entity<Worktree>>> {
 218        let abs_path: Arc<SanitizedPath> = SanitizedPath::new_arc(&abs_path);
 219        if !self.loading_worktrees.contains_key(&abs_path) {
 220            let task = match &self.state {
 221                WorktreeStoreState::Remote {
 222                    upstream_client,
 223                    path_style,
 224                    ..
 225                } => {
 226                    if upstream_client.is_via_collab() {
 227                        Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
 228                    } else {
 229                        let abs_path = RemotePathBuf::new(abs_path.to_path_buf(), *path_style);
 230                        self.create_remote_worktree(upstream_client.clone(), abs_path, visible, cx)
 231                    }
 232                }
 233                WorktreeStoreState::Local { fs } => {
 234                    self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
 235                }
 236            };
 237
 238            self.loading_worktrees
 239                .insert(abs_path.clone(), task.shared());
 240        }
 241        let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
 242        cx.spawn(async move |this, cx| {
 243            let result = task.await;
 244            this.update(cx, |this, _| this.loading_worktrees.remove(&abs_path))
 245                .ok();
 246            match result {
 247                Ok(worktree) => Ok(worktree),
 248                Err(err) => Err((*err).cloned()),
 249            }
 250        })
 251    }
 252
 253    fn create_remote_worktree(
 254        &mut self,
 255        client: AnyProtoClient,
 256        abs_path: RemotePathBuf,
 257        visible: bool,
 258        cx: &mut Context<Self>,
 259    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
 260        let path_style = abs_path.path_style();
 261        let mut abs_path = abs_path.to_string();
 262        // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
 263        // in which case want to strip the leading the `/`.
 264        // On the host-side, the `~` will get expanded.
 265        // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
 266        if abs_path.starts_with("/~") {
 267            abs_path = abs_path[1..].to_string();
 268        }
 269        if abs_path.is_empty() {
 270            abs_path = "~/".to_string();
 271        }
 272
 273        cx.spawn(async move |this, cx| {
 274            let this = this.upgrade().context("Dropped worktree store")?;
 275
 276            let path = RemotePathBuf::new(abs_path.into(), path_style);
 277            let response = client
 278                .request(proto::AddWorktree {
 279                    project_id: REMOTE_SERVER_PROJECT_ID,
 280                    path: path.to_proto(),
 281                    visible,
 282                })
 283                .await?;
 284
 285            if let Some(existing_worktree) = this.read_with(cx, |this, cx| {
 286                this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
 287            })? {
 288                return Ok(existing_worktree);
 289            }
 290
 291            let root_path_buf = PathBuf::from_proto(response.canonicalized_path.clone());
 292            let root_name = root_path_buf
 293                .file_name()
 294                .map(|n| n.to_string_lossy().to_string())
 295                .unwrap_or(root_path_buf.to_string_lossy().to_string());
 296
 297            let worktree = cx.update(|cx| {
 298                Worktree::remote(
 299                    REMOTE_SERVER_PROJECT_ID,
 300                    0,
 301                    proto::WorktreeMetadata {
 302                        id: response.worktree_id,
 303                        root_name,
 304                        visible,
 305                        abs_path: response.canonicalized_path,
 306                    },
 307                    client,
 308                    cx,
 309                )
 310            })?;
 311
 312            this.update(cx, |this, cx| {
 313                this.add(&worktree, cx);
 314            })?;
 315            Ok(worktree)
 316        })
 317    }
 318
 319    fn create_local_worktree(
 320        &mut self,
 321        fs: Arc<dyn Fs>,
 322        abs_path: Arc<SanitizedPath>,
 323        visible: bool,
 324        cx: &mut Context<Self>,
 325    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
 326        let next_entry_id = self.next_entry_id.clone();
 327
 328        cx.spawn(async move |this, cx| {
 329            let worktree = Worktree::local(
 330                SanitizedPath::cast_arc(abs_path.clone()),
 331                visible,
 332                fs,
 333                next_entry_id,
 334                cx,
 335            )
 336            .await;
 337
 338            let worktree = worktree?;
 339
 340            this.update(cx, |this, cx| this.add(&worktree, cx))?;
 341
 342            if visible {
 343                cx.update(|cx| {
 344                    cx.add_recent_document(abs_path.as_path());
 345                })
 346                .log_err();
 347            }
 348
 349            Ok(worktree)
 350        })
 351    }
 352
 353    pub fn add(&mut self, worktree: &Entity<Worktree>, cx: &mut Context<Self>) {
 354        let worktree_id = worktree.read(cx).id();
 355        debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
 356
 357        let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
 358        let handle = if push_strong_handle {
 359            WorktreeHandle::Strong(worktree.clone())
 360        } else {
 361            WorktreeHandle::Weak(worktree.downgrade())
 362        };
 363        if self.worktrees_reordered {
 364            self.worktrees.push(handle);
 365        } else {
 366            let i = match self
 367                .worktrees
 368                .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
 369                    other.upgrade().map(|worktree| worktree.read(cx).abs_path())
 370                }) {
 371                Ok(i) | Err(i) => i,
 372            };
 373            self.worktrees.insert(i, handle);
 374        }
 375
 376        cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
 377        self.send_project_updates(cx);
 378
 379        let handle_id = worktree.entity_id();
 380        cx.subscribe(worktree, |_, worktree, event, cx| {
 381            let worktree_id = worktree.read(cx).id();
 382            match event {
 383                worktree::Event::UpdatedEntries(changes) => {
 384                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedEntries(
 385                        worktree_id,
 386                        changes.clone(),
 387                    ));
 388                }
 389                worktree::Event::UpdatedGitRepositories(set) => {
 390                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedGitRepositories(
 391                        worktree_id,
 392                        set.clone(),
 393                    ));
 394                }
 395                worktree::Event::DeletedEntry(id) => {
 396                    cx.emit(WorktreeStoreEvent::WorktreeDeletedEntry(worktree_id, *id))
 397                }
 398            }
 399        })
 400        .detach();
 401        cx.observe_release(worktree, move |this, worktree, cx| {
 402            cx.emit(WorktreeStoreEvent::WorktreeReleased(
 403                handle_id,
 404                worktree.id(),
 405            ));
 406            cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 407                handle_id,
 408                worktree.id(),
 409            ));
 410            this.send_project_updates(cx);
 411        })
 412        .detach();
 413    }
 414
 415    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut Context<Self>) {
 416        self.worktrees.retain(|worktree| {
 417            if let Some(worktree) = worktree.upgrade() {
 418                if worktree.read(cx).id() == id_to_remove {
 419                    cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 420                        worktree.entity_id(),
 421                        id_to_remove,
 422                    ));
 423                    false
 424                } else {
 425                    true
 426                }
 427            } else {
 428                false
 429            }
 430        });
 431        self.send_project_updates(cx);
 432    }
 433
 434    pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
 435        self.worktrees_reordered = worktrees_reordered;
 436    }
 437
 438    fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
 439        match &self.state {
 440            WorktreeStoreState::Remote {
 441                upstream_client,
 442                upstream_project_id,
 443                ..
 444            } => Some((upstream_client.clone(), *upstream_project_id)),
 445            WorktreeStoreState::Local { .. } => None,
 446        }
 447    }
 448
 449    pub fn set_worktrees_from_proto(
 450        &mut self,
 451        worktrees: Vec<proto::WorktreeMetadata>,
 452        replica_id: ReplicaId,
 453        cx: &mut Context<Self>,
 454    ) -> Result<()> {
 455        let mut old_worktrees_by_id = self
 456            .worktrees
 457            .drain(..)
 458            .filter_map(|worktree| {
 459                let worktree = worktree.upgrade()?;
 460                Some((worktree.read(cx).id(), worktree))
 461            })
 462            .collect::<HashMap<_, _>>();
 463
 464        let (client, project_id) = self.upstream_client().context("invalid project")?;
 465
 466        for worktree in worktrees {
 467            if let Some(old_worktree) =
 468                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
 469            {
 470                let push_strong_handle =
 471                    self.retain_worktrees || old_worktree.read(cx).is_visible();
 472                let handle = if push_strong_handle {
 473                    WorktreeHandle::Strong(old_worktree.clone())
 474                } else {
 475                    WorktreeHandle::Weak(old_worktree.downgrade())
 476                };
 477                self.worktrees.push(handle);
 478            } else {
 479                self.add(
 480                    &Worktree::remote(project_id, replica_id, worktree, client.clone(), cx),
 481                    cx,
 482                );
 483            }
 484        }
 485        self.send_project_updates(cx);
 486
 487        Ok(())
 488    }
 489
 490    pub fn move_worktree(
 491        &mut self,
 492        source: WorktreeId,
 493        destination: WorktreeId,
 494        cx: &mut Context<Self>,
 495    ) -> Result<()> {
 496        if source == destination {
 497            return Ok(());
 498        }
 499
 500        let mut source_index = None;
 501        let mut destination_index = None;
 502        for (i, worktree) in self.worktrees.iter().enumerate() {
 503            if let Some(worktree) = worktree.upgrade() {
 504                let worktree_id = worktree.read(cx).id();
 505                if worktree_id == source {
 506                    source_index = Some(i);
 507                    if destination_index.is_some() {
 508                        break;
 509                    }
 510                } else if worktree_id == destination {
 511                    destination_index = Some(i);
 512                    if source_index.is_some() {
 513                        break;
 514                    }
 515                }
 516            }
 517        }
 518
 519        let source_index =
 520            source_index.with_context(|| format!("Missing worktree for id {source}"))?;
 521        let destination_index =
 522            destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
 523
 524        if source_index == destination_index {
 525            return Ok(());
 526        }
 527
 528        let worktree_to_move = self.worktrees.remove(source_index);
 529        self.worktrees.insert(destination_index, worktree_to_move);
 530        self.worktrees_reordered = true;
 531        cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
 532        cx.notify();
 533        Ok(())
 534    }
 535
 536    pub fn disconnected_from_host(&mut self, cx: &mut App) {
 537        for worktree in &self.worktrees {
 538            if let Some(worktree) = worktree.upgrade() {
 539                worktree.update(cx, |worktree, _| {
 540                    if let Some(worktree) = worktree.as_remote_mut() {
 541                        worktree.disconnected_from_host();
 542                    }
 543                });
 544            }
 545        }
 546    }
 547
 548    pub fn send_project_updates(&mut self, cx: &mut Context<Self>) {
 549        let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
 550            return;
 551        };
 552
 553        let update = proto::UpdateProject {
 554            project_id,
 555            worktrees: self.worktree_metadata_protos(cx),
 556        };
 557
 558        // collab has bad concurrency guarantees, so we send requests in serial.
 559        let update_project = if downstream_client.is_via_collab() {
 560            Some(downstream_client.request(update))
 561        } else {
 562            downstream_client.send(update).log_err();
 563            None
 564        };
 565        cx.spawn(async move |this, cx| {
 566            if let Some(update_project) = update_project {
 567                update_project.await?;
 568            }
 569
 570            this.update(cx, |this, cx| {
 571                let worktrees = this.worktrees().collect::<Vec<_>>();
 572
 573                for worktree in worktrees {
 574                    worktree.update(cx, |worktree, cx| {
 575                        let client = downstream_client.clone();
 576                        worktree.observe_updates(project_id, cx, {
 577                            move |update| {
 578                                let client = client.clone();
 579                                async move {
 580                                    if client.is_via_collab() {
 581                                        client
 582                                            .request(update)
 583                                            .map(|result| result.log_err().is_some())
 584                                            .await
 585                                    } else {
 586                                        client.send(update).log_err().is_some()
 587                                    }
 588                                }
 589                            }
 590                        });
 591                    });
 592
 593                    cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
 594                }
 595
 596                anyhow::Ok(())
 597            })
 598        })
 599        .detach_and_log_err(cx);
 600    }
 601
 602    pub fn worktree_metadata_protos(&self, cx: &App) -> Vec<proto::WorktreeMetadata> {
 603        self.worktrees()
 604            .map(|worktree| {
 605                let worktree = worktree.read(cx);
 606                proto::WorktreeMetadata {
 607                    id: worktree.id().to_proto(),
 608                    root_name: worktree.root_name().into(),
 609                    visible: worktree.is_visible(),
 610                    abs_path: worktree.abs_path().to_proto(),
 611                }
 612            })
 613            .collect()
 614    }
 615
 616    pub fn shared(
 617        &mut self,
 618        remote_id: u64,
 619        downstream_client: AnyProtoClient,
 620        cx: &mut Context<Self>,
 621    ) {
 622        self.retain_worktrees = true;
 623        self.downstream_client = Some((downstream_client, remote_id));
 624
 625        // When shared, retain all worktrees
 626        for worktree_handle in self.worktrees.iter_mut() {
 627            match worktree_handle {
 628                WorktreeHandle::Strong(_) => {}
 629                WorktreeHandle::Weak(worktree) => {
 630                    if let Some(worktree) = worktree.upgrade() {
 631                        *worktree_handle = WorktreeHandle::Strong(worktree);
 632                    }
 633                }
 634            }
 635        }
 636        self.send_project_updates(cx);
 637    }
 638
 639    pub fn unshared(&mut self, cx: &mut Context<Self>) {
 640        self.retain_worktrees = false;
 641        self.downstream_client.take();
 642
 643        // When not shared, only retain the visible worktrees
 644        for worktree_handle in self.worktrees.iter_mut() {
 645            if let WorktreeHandle::Strong(worktree) = worktree_handle {
 646                let is_visible = worktree.update(cx, |worktree, _| {
 647                    worktree.stop_observing_updates();
 648                    worktree.is_visible()
 649                });
 650                if !is_visible {
 651                    *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
 652                }
 653            }
 654        }
 655    }
 656
 657    /// search over all worktrees and return buffers that *might* match the search.
 658    pub fn find_search_candidates(
 659        &self,
 660        query: SearchQuery,
 661        limit: usize,
 662        open_entries: HashSet<ProjectEntryId>,
 663        fs: Arc<dyn Fs>,
 664        cx: &Context<Self>,
 665    ) -> Receiver<ProjectPath> {
 666        let snapshots = self
 667            .visible_worktrees(cx)
 668            .filter_map(|tree| {
 669                let tree = tree.read(cx);
 670                Some((tree.snapshot(), tree.as_local()?.settings()))
 671            })
 672            .collect::<Vec<_>>();
 673
 674        let executor = cx.background_executor().clone();
 675
 676        // We want to return entries in the order they are in the worktrees, so we have one
 677        // thread that iterates over the worktrees (and ignored directories) as necessary,
 678        // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
 679        // channel.
 680        // We spawn a number of workers that take items from the filter channel and check the query
 681        // against the version of the file on disk.
 682        let (filter_tx, filter_rx) = smol::channel::bounded(64);
 683        let (output_tx, output_rx) = smol::channel::bounded(64);
 684        let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
 685
 686        let input = cx.background_spawn({
 687            let fs = fs.clone();
 688            let query = query.clone();
 689            async move {
 690                Self::find_candidate_paths(
 691                    fs,
 692                    snapshots,
 693                    open_entries,
 694                    query,
 695                    filter_tx,
 696                    output_tx,
 697                )
 698                .await
 699                .log_err();
 700            }
 701        });
 702        const MAX_CONCURRENT_FILE_SCANS: usize = 64;
 703        let filters = cx.background_spawn(async move {
 704            let fs = &fs;
 705            let query = &query;
 706            executor
 707                .scoped(move |scope| {
 708                    for _ in 0..MAX_CONCURRENT_FILE_SCANS {
 709                        let filter_rx = filter_rx.clone();
 710                        scope.spawn(async move {
 711                            Self::filter_paths(fs, filter_rx, query)
 712                                .await
 713                                .log_with_level(log::Level::Debug);
 714                        })
 715                    }
 716                })
 717                .await;
 718        });
 719        cx.background_spawn(async move {
 720            let mut matched = 0;
 721            while let Ok(mut receiver) = output_rx.recv().await {
 722                let Some(path) = receiver.next().await else {
 723                    continue;
 724                };
 725                let Ok(_) = matching_paths_tx.send(path).await else {
 726                    break;
 727                };
 728                matched += 1;
 729                if matched == limit {
 730                    break;
 731                }
 732            }
 733            drop(input);
 734            drop(filters);
 735        })
 736        .detach();
 737        matching_paths_rx
 738    }
 739
 740    fn scan_ignored_dir<'a>(
 741        fs: &'a Arc<dyn Fs>,
 742        snapshot: &'a worktree::Snapshot,
 743        path: &'a Path,
 744        query: &'a SearchQuery,
 745        filter_tx: &'a Sender<MatchingEntry>,
 746        output_tx: &'a Sender<oneshot::Receiver<ProjectPath>>,
 747    ) -> BoxFuture<'a, Result<()>> {
 748        async move {
 749            let abs_path = snapshot.abs_path().join(path);
 750            let Some(mut files) = fs
 751                .read_dir(&abs_path)
 752                .await
 753                .with_context(|| format!("listing ignored path {abs_path:?}"))
 754                .log_err()
 755            else {
 756                return Ok(());
 757            };
 758
 759            let mut results = Vec::new();
 760
 761            while let Some(Ok(file)) = files.next().await {
 762                let Some(metadata) = fs
 763                    .metadata(&file)
 764                    .await
 765                    .with_context(|| format!("fetching fs metadata for {abs_path:?}"))
 766                    .log_err()
 767                    .flatten()
 768                else {
 769                    continue;
 770                };
 771                if metadata.is_symlink || metadata.is_fifo {
 772                    continue;
 773                }
 774                results.push((
 775                    file.strip_prefix(snapshot.abs_path())?.to_path_buf(),
 776                    !metadata.is_dir,
 777                ))
 778            }
 779            results.sort_by(|(a_path, _), (b_path, _)| a_path.cmp(b_path));
 780            for (path, is_file) in results {
 781                if is_file {
 782                    if query.filters_path() {
 783                        let matched_path = if query.match_full_paths() {
 784                            let mut full_path = PathBuf::from(snapshot.root_name());
 785                            full_path.push(&path);
 786                            query.match_path(&full_path)
 787                        } else {
 788                            query.match_path(&path)
 789                        };
 790                        if !matched_path {
 791                            continue;
 792                        }
 793                    }
 794                    let (tx, rx) = oneshot::channel();
 795                    output_tx.send(rx).await?;
 796                    filter_tx
 797                        .send(MatchingEntry {
 798                            respond: tx,
 799                            worktree_path: snapshot.abs_path().clone(),
 800                            path: ProjectPath {
 801                                worktree_id: snapshot.id(),
 802                                path: Arc::from(path),
 803                            },
 804                        })
 805                        .await?;
 806                } else {
 807                    Self::scan_ignored_dir(fs, snapshot, &path, query, filter_tx, output_tx)
 808                        .await?;
 809                }
 810            }
 811            Ok(())
 812        }
 813        .boxed()
 814    }
 815
 816    async fn find_candidate_paths(
 817        fs: Arc<dyn Fs>,
 818        snapshots: Vec<(worktree::Snapshot, WorktreeSettings)>,
 819        open_entries: HashSet<ProjectEntryId>,
 820        query: SearchQuery,
 821        filter_tx: Sender<MatchingEntry>,
 822        output_tx: Sender<oneshot::Receiver<ProjectPath>>,
 823    ) -> Result<()> {
 824        for (snapshot, settings) in snapshots {
 825            for entry in snapshot.entries(query.include_ignored(), 0) {
 826                if entry.is_dir() && entry.is_ignored {
 827                    if !settings.is_path_excluded(&entry.path) {
 828                        Self::scan_ignored_dir(
 829                            &fs,
 830                            &snapshot,
 831                            &entry.path,
 832                            &query,
 833                            &filter_tx,
 834                            &output_tx,
 835                        )
 836                        .await?;
 837                    }
 838                    continue;
 839                }
 840
 841                if entry.is_fifo || !entry.is_file() {
 842                    continue;
 843                }
 844
 845                if query.filters_path() {
 846                    let matched_path = if query.match_full_paths() {
 847                        let mut full_path = PathBuf::from(snapshot.root_name());
 848                        full_path.push(&entry.path);
 849                        query.match_path(&full_path)
 850                    } else {
 851                        query.match_path(&entry.path)
 852                    };
 853                    if !matched_path {
 854                        continue;
 855                    }
 856                }
 857
 858                let (mut tx, rx) = oneshot::channel();
 859
 860                if open_entries.contains(&entry.id) {
 861                    tx.send(ProjectPath {
 862                        worktree_id: snapshot.id(),
 863                        path: entry.path.clone(),
 864                    })
 865                    .await?;
 866                } else {
 867                    filter_tx
 868                        .send(MatchingEntry {
 869                            respond: tx,
 870                            worktree_path: snapshot.abs_path().clone(),
 871                            path: ProjectPath {
 872                                worktree_id: snapshot.id(),
 873                                path: entry.path.clone(),
 874                            },
 875                        })
 876                        .await?;
 877                }
 878
 879                output_tx.send(rx).await?;
 880            }
 881        }
 882        Ok(())
 883    }
 884
 885    async fn filter_paths(
 886        fs: &Arc<dyn Fs>,
 887        input: Receiver<MatchingEntry>,
 888        query: &SearchQuery,
 889    ) -> Result<()> {
 890        let mut input = pin!(input);
 891        while let Some(mut entry) = input.next().await {
 892            let abs_path = entry.worktree_path.join(&entry.path.path);
 893            let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
 894                continue;
 895            };
 896
 897            let mut file = BufReader::new(file);
 898            let file_start = file.fill_buf()?;
 899
 900            if let Err(Some(starting_position)) =
 901                std::str::from_utf8(file_start).map_err(|e| e.error_len())
 902            {
 903                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
 904                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
 905                log::debug!(
 906                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
 907                );
 908                continue;
 909            }
 910
 911            if query.detect(file).unwrap_or(false) {
 912                entry.respond.send(entry.path).await?
 913            }
 914        }
 915
 916        Ok(())
 917    }
 918
 919    pub async fn handle_create_project_entry(
 920        this: Entity<Self>,
 921        envelope: TypedEnvelope<proto::CreateProjectEntry>,
 922        mut cx: AsyncApp,
 923    ) -> Result<proto::ProjectEntryResponse> {
 924        let worktree = this.update(&mut cx, |this, cx| {
 925            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
 926            this.worktree_for_id(worktree_id, cx)
 927                .context("worktree not found")
 928        })??;
 929        Worktree::handle_create_entry(worktree, envelope.payload, cx).await
 930    }
 931
 932    pub async fn handle_copy_project_entry(
 933        this: Entity<Self>,
 934        envelope: TypedEnvelope<proto::CopyProjectEntry>,
 935        mut cx: AsyncApp,
 936    ) -> Result<proto::ProjectEntryResponse> {
 937        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
 938        let worktree = this.update(&mut cx, |this, cx| {
 939            this.worktree_for_entry(entry_id, cx)
 940                .context("worktree not found")
 941        })??;
 942        Worktree::handle_copy_entry(worktree, envelope.payload, cx).await
 943    }
 944
 945    pub async fn handle_delete_project_entry(
 946        this: Entity<Self>,
 947        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
 948        mut cx: AsyncApp,
 949    ) -> Result<proto::ProjectEntryResponse> {
 950        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
 951        let worktree = this.update(&mut cx, |this, cx| {
 952            this.worktree_for_entry(entry_id, cx)
 953                .context("worktree not found")
 954        })??;
 955        Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
 956    }
 957
 958    pub async fn handle_expand_project_entry(
 959        this: Entity<Self>,
 960        envelope: TypedEnvelope<proto::ExpandProjectEntry>,
 961        mut cx: AsyncApp,
 962    ) -> Result<proto::ExpandProjectEntryResponse> {
 963        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
 964        let worktree = this
 965            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
 966            .context("invalid request")?;
 967        Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
 968    }
 969
 970    pub async fn handle_expand_all_for_project_entry(
 971        this: Entity<Self>,
 972        envelope: TypedEnvelope<proto::ExpandAllForProjectEntry>,
 973        mut cx: AsyncApp,
 974    ) -> Result<proto::ExpandAllForProjectEntryResponse> {
 975        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
 976        let worktree = this
 977            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
 978            .context("invalid request")?;
 979        Worktree::handle_expand_all_for_entry(worktree, envelope.payload, cx).await
 980    }
 981
 982    pub fn fs(&self) -> Option<Arc<dyn Fs>> {
 983        match &self.state {
 984            WorktreeStoreState::Local { fs } => Some(fs.clone()),
 985            WorktreeStoreState::Remote { .. } => None,
 986        }
 987    }
 988}
 989
 990#[derive(Clone, Debug)]
 991enum WorktreeHandle {
 992    Strong(Entity<Worktree>),
 993    Weak(WeakEntity<Worktree>),
 994}
 995
 996impl WorktreeHandle {
 997    fn upgrade(&self) -> Option<Entity<Worktree>> {
 998        match self {
 999            WorktreeHandle::Strong(handle) => Some(handle.clone()),
1000            WorktreeHandle::Weak(handle) => handle.upgrade(),
1001        }
1002    }
1003}