worktree_store.rs

   1use std::{
   2    io::{BufRead, BufReader},
   3    path::{Path, PathBuf},
   4    pin::pin,
   5    sync::{Arc, atomic::AtomicUsize},
   6};
   7
   8use anyhow::{Context as _, Result, anyhow};
   9use collections::{HashMap, HashSet};
  10use fs::Fs;
  11use futures::{
  12    FutureExt, SinkExt,
  13    future::{BoxFuture, Shared},
  14};
  15use gpui::{
  16    App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity,
  17};
  18use postage::oneshot;
  19use rpc::{
  20    AnyProtoClient, ErrorExt, TypedEnvelope,
  21    proto::{self, FromProto, REMOTE_SERVER_PROJECT_ID, ToProto},
  22};
  23use smol::{
  24    channel::{Receiver, Sender},
  25    stream::StreamExt,
  26};
  27use text::ReplicaId;
  28use util::{
  29    ResultExt,
  30    paths::{PathStyle, RemotePathBuf, SanitizedPath},
  31};
  32use worktree::{
  33    Entry, ProjectEntryId, UpdatedEntriesSet, UpdatedGitRepositoriesSet, Worktree, WorktreeId,
  34    WorktreeSettings,
  35};
  36
  37use crate::{ProjectPath, search::SearchQuery};
  38
  39struct MatchingEntry {
  40    worktree_path: Arc<Path>,
  41    path: ProjectPath,
  42    respond: oneshot::Sender<ProjectPath>,
  43}
  44
  45enum WorktreeStoreState {
  46    Local {
  47        fs: Arc<dyn Fs>,
  48    },
  49    Remote {
  50        upstream_client: AnyProtoClient,
  51        upstream_project_id: u64,
  52        path_style: PathStyle,
  53    },
  54}
  55
  56pub struct WorktreeStore {
  57    next_entry_id: Arc<AtomicUsize>,
  58    downstream_client: Option<(AnyProtoClient, u64)>,
  59    retain_worktrees: bool,
  60    worktrees: Vec<WorktreeHandle>,
  61    worktrees_reordered: bool,
  62    #[allow(clippy::type_complexity)]
  63    loading_worktrees:
  64        HashMap<Arc<SanitizedPath>, Shared<Task<Result<Entity<Worktree>, Arc<anyhow::Error>>>>>,
  65    state: WorktreeStoreState,
  66}
  67
  68#[derive(Debug)]
  69pub enum WorktreeStoreEvent {
  70    WorktreeAdded(Entity<Worktree>),
  71    WorktreeRemoved(EntityId, WorktreeId),
  72    WorktreeReleased(EntityId, WorktreeId),
  73    WorktreeOrderChanged,
  74    WorktreeUpdateSent(Entity<Worktree>),
  75    WorktreeUpdatedEntries(WorktreeId, UpdatedEntriesSet),
  76    WorktreeUpdatedGitRepositories(WorktreeId, UpdatedGitRepositoriesSet),
  77    WorktreeDeletedEntry(WorktreeId, ProjectEntryId),
  78}
  79
  80impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
  81
  82impl WorktreeStore {
  83    pub fn init(client: &AnyProtoClient) {
  84        client.add_entity_request_handler(Self::handle_create_project_entry);
  85        client.add_entity_request_handler(Self::handle_copy_project_entry);
  86        client.add_entity_request_handler(Self::handle_delete_project_entry);
  87        client.add_entity_request_handler(Self::handle_expand_project_entry);
  88        client.add_entity_request_handler(Self::handle_expand_all_for_project_entry);
  89    }
  90
  91    pub fn local(retain_worktrees: bool, fs: Arc<dyn Fs>) -> Self {
  92        Self {
  93            next_entry_id: Default::default(),
  94            loading_worktrees: Default::default(),
  95            downstream_client: None,
  96            worktrees: Vec::new(),
  97            worktrees_reordered: false,
  98            retain_worktrees,
  99            state: WorktreeStoreState::Local { fs },
 100        }
 101    }
 102
 103    pub fn remote(
 104        retain_worktrees: bool,
 105        upstream_client: AnyProtoClient,
 106        upstream_project_id: u64,
 107        path_style: PathStyle,
 108    ) -> Self {
 109        Self {
 110            next_entry_id: Default::default(),
 111            loading_worktrees: Default::default(),
 112            downstream_client: None,
 113            worktrees: Vec::new(),
 114            worktrees_reordered: false,
 115            retain_worktrees,
 116            state: WorktreeStoreState::Remote {
 117                upstream_client,
 118                upstream_project_id,
 119                path_style,
 120            },
 121        }
 122    }
 123
 124    /// Iterates through all worktrees, including ones that don't appear in the project panel
 125    pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Entity<Worktree>> {
 126        self.worktrees
 127            .iter()
 128            .filter_map(move |worktree| worktree.upgrade())
 129    }
 130
 131    /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
 132    pub fn visible_worktrees<'a>(
 133        &'a self,
 134        cx: &'a App,
 135    ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
 136        self.worktrees()
 137            .filter(|worktree| worktree.read(cx).is_visible())
 138    }
 139
 140    pub fn worktree_for_id(&self, id: WorktreeId, cx: &App) -> Option<Entity<Worktree>> {
 141        self.worktrees()
 142            .find(|worktree| worktree.read(cx).id() == id)
 143    }
 144
 145    pub fn worktree_for_entry(
 146        &self,
 147        entry_id: ProjectEntryId,
 148        cx: &App,
 149    ) -> Option<Entity<Worktree>> {
 150        self.worktrees()
 151            .find(|worktree| worktree.read(cx).contains_entry(entry_id))
 152    }
 153
 154    pub fn find_worktree(
 155        &self,
 156        abs_path: impl AsRef<Path>,
 157        cx: &App,
 158    ) -> Option<(Entity<Worktree>, PathBuf)> {
 159        let abs_path = SanitizedPath::new(&abs_path);
 160        for tree in self.worktrees() {
 161            if let Ok(relative_path) = abs_path.as_path().strip_prefix(tree.read(cx).abs_path()) {
 162                return Some((tree.clone(), relative_path.into()));
 163            }
 164        }
 165        None
 166    }
 167
 168    pub fn absolutize(&self, project_path: &ProjectPath, cx: &App) -> Option<PathBuf> {
 169        let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
 170        worktree.read(cx).absolutize(&project_path.path).ok()
 171    }
 172
 173    pub fn find_or_create_worktree(
 174        &mut self,
 175        abs_path: impl AsRef<Path>,
 176        visible: bool,
 177        cx: &mut Context<Self>,
 178    ) -> Task<Result<(Entity<Worktree>, PathBuf)>> {
 179        let abs_path = abs_path.as_ref();
 180        if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
 181            Task::ready(Ok((tree, relative_path)))
 182        } else {
 183            let worktree = self.create_worktree(abs_path, visible, cx);
 184            cx.background_spawn(async move { Ok((worktree.await?, PathBuf::new())) })
 185        }
 186    }
 187
 188    pub fn entry_for_id<'a>(&'a self, entry_id: ProjectEntryId, cx: &'a App) -> Option<&'a Entry> {
 189        self.worktrees()
 190            .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
 191    }
 192
 193    pub fn worktree_and_entry_for_id<'a>(
 194        &'a self,
 195        entry_id: ProjectEntryId,
 196        cx: &'a App,
 197    ) -> Option<(Entity<Worktree>, &'a Entry)> {
 198        self.worktrees().find_map(|worktree| {
 199            worktree
 200                .read(cx)
 201                .entry_for_id(entry_id)
 202                .map(|e| (worktree.clone(), e))
 203        })
 204    }
 205
 206    pub fn entry_for_path(&self, path: &ProjectPath, cx: &App) -> Option<Entry> {
 207        self.worktree_for_id(path.worktree_id, cx)?
 208            .read(cx)
 209            .entry_for_path(&path.path)
 210            .cloned()
 211    }
 212
 213    pub fn create_worktree(
 214        &mut self,
 215        abs_path: impl AsRef<Path>,
 216        visible: bool,
 217        cx: &mut Context<Self>,
 218    ) -> Task<Result<Entity<Worktree>>> {
 219        let abs_path: Arc<SanitizedPath> = SanitizedPath::new_arc(&abs_path);
 220        if !self.loading_worktrees.contains_key(&abs_path) {
 221            let task = match &self.state {
 222                WorktreeStoreState::Remote {
 223                    upstream_client,
 224                    path_style,
 225                    ..
 226                } => {
 227                    if upstream_client.is_via_collab() {
 228                        Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
 229                    } else {
 230                        let abs_path = RemotePathBuf::new(abs_path.to_path_buf(), *path_style);
 231                        self.create_ssh_worktree(upstream_client.clone(), abs_path, visible, cx)
 232                    }
 233                }
 234                WorktreeStoreState::Local { fs } => {
 235                    self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
 236                }
 237            };
 238
 239            self.loading_worktrees
 240                .insert(abs_path.clone(), task.shared());
 241        }
 242        let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
 243        cx.spawn(async move |this, cx| {
 244            let result = task.await;
 245            this.update(cx, |this, _| this.loading_worktrees.remove(&abs_path))
 246                .ok();
 247            match result {
 248                Ok(worktree) => Ok(worktree),
 249                Err(err) => Err((*err).cloned()),
 250            }
 251        })
 252    }
 253
 254    fn create_ssh_worktree(
 255        &mut self,
 256        client: AnyProtoClient,
 257        abs_path: RemotePathBuf,
 258        visible: bool,
 259        cx: &mut Context<Self>,
 260    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
 261        let path_style = abs_path.path_style();
 262        let mut abs_path = abs_path.to_string();
 263        // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
 264        // in which case want to strip the leading the `/`.
 265        // On the host-side, the `~` will get expanded.
 266        // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
 267        if abs_path.starts_with("/~") {
 268            abs_path = abs_path[1..].to_string();
 269        }
 270        if abs_path.is_empty() {
 271            abs_path = "~/".to_string();
 272        }
 273
 274        cx.spawn(async move |this, cx| {
 275            let this = this.upgrade().context("Dropped worktree store")?;
 276
 277            let path = RemotePathBuf::new(abs_path.into(), path_style);
 278            let response = client
 279                .request(proto::AddWorktree {
 280                    project_id: REMOTE_SERVER_PROJECT_ID,
 281                    path: path.to_proto(),
 282                    visible,
 283                })
 284                .await?;
 285
 286            if let Some(existing_worktree) = this.read_with(cx, |this, cx| {
 287                this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
 288            })? {
 289                return Ok(existing_worktree);
 290            }
 291
 292            let root_path_buf = PathBuf::from_proto(response.canonicalized_path.clone());
 293            let root_name = root_path_buf
 294                .file_name()
 295                .map(|n| n.to_string_lossy().to_string())
 296                .unwrap_or(root_path_buf.to_string_lossy().to_string());
 297
 298            let worktree = cx.update(|cx| {
 299                Worktree::remote(
 300                    REMOTE_SERVER_PROJECT_ID,
 301                    0,
 302                    proto::WorktreeMetadata {
 303                        id: response.worktree_id,
 304                        root_name,
 305                        visible,
 306                        abs_path: response.canonicalized_path,
 307                    },
 308                    client,
 309                    cx,
 310                )
 311            })?;
 312
 313            this.update(cx, |this, cx| {
 314                this.add(&worktree, cx);
 315            })?;
 316            Ok(worktree)
 317        })
 318    }
 319
 320    fn create_local_worktree(
 321        &mut self,
 322        fs: Arc<dyn Fs>,
 323        abs_path: Arc<SanitizedPath>,
 324        visible: bool,
 325        cx: &mut Context<Self>,
 326    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
 327        let next_entry_id = self.next_entry_id.clone();
 328
 329        cx.spawn(async move |this, cx| {
 330            let worktree = Worktree::local(
 331                SanitizedPath::cast_arc(abs_path.clone()),
 332                visible,
 333                fs,
 334                next_entry_id,
 335                cx,
 336            )
 337            .await;
 338
 339            let worktree = worktree?;
 340
 341            this.update(cx, |this, cx| this.add(&worktree, cx))?;
 342
 343            if visible {
 344                cx.update(|cx| {
 345                    cx.add_recent_document(abs_path.as_path());
 346                })
 347                .log_err();
 348            }
 349
 350            Ok(worktree)
 351        })
 352    }
 353
 354    pub fn add(&mut self, worktree: &Entity<Worktree>, cx: &mut Context<Self>) {
 355        let worktree_id = worktree.read(cx).id();
 356        debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
 357
 358        let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
 359        let handle = if push_strong_handle {
 360            WorktreeHandle::Strong(worktree.clone())
 361        } else {
 362            WorktreeHandle::Weak(worktree.downgrade())
 363        };
 364        if self.worktrees_reordered {
 365            self.worktrees.push(handle);
 366        } else {
 367            let i = match self
 368                .worktrees
 369                .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
 370                    other.upgrade().map(|worktree| worktree.read(cx).abs_path())
 371                }) {
 372                Ok(i) | Err(i) => i,
 373            };
 374            self.worktrees.insert(i, handle);
 375        }
 376
 377        cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
 378        self.send_project_updates(cx);
 379
 380        let handle_id = worktree.entity_id();
 381        cx.subscribe(worktree, |_, worktree, event, cx| {
 382            let worktree_id = worktree.read(cx).id();
 383            match event {
 384                worktree::Event::UpdatedEntries(changes) => {
 385                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedEntries(
 386                        worktree_id,
 387                        changes.clone(),
 388                    ));
 389                }
 390                worktree::Event::UpdatedGitRepositories(set) => {
 391                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedGitRepositories(
 392                        worktree_id,
 393                        set.clone(),
 394                    ));
 395                }
 396                worktree::Event::DeletedEntry(id) => {
 397                    cx.emit(WorktreeStoreEvent::WorktreeDeletedEntry(worktree_id, *id))
 398                }
 399            }
 400        })
 401        .detach();
 402        cx.observe_release(worktree, move |this, worktree, cx| {
 403            cx.emit(WorktreeStoreEvent::WorktreeReleased(
 404                handle_id,
 405                worktree.id(),
 406            ));
 407            cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 408                handle_id,
 409                worktree.id(),
 410            ));
 411            this.send_project_updates(cx);
 412        })
 413        .detach();
 414    }
 415
 416    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut Context<Self>) {
 417        self.worktrees.retain(|worktree| {
 418            if let Some(worktree) = worktree.upgrade() {
 419                if worktree.read(cx).id() == id_to_remove {
 420                    cx.emit(WorktreeStoreEvent::WorktreeRemoved(
 421                        worktree.entity_id(),
 422                        id_to_remove,
 423                    ));
 424                    false
 425                } else {
 426                    true
 427                }
 428            } else {
 429                false
 430            }
 431        });
 432        self.send_project_updates(cx);
 433    }
 434
 435    pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
 436        self.worktrees_reordered = worktrees_reordered;
 437    }
 438
 439    fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
 440        match &self.state {
 441            WorktreeStoreState::Remote {
 442                upstream_client,
 443                upstream_project_id,
 444                ..
 445            } => Some((upstream_client.clone(), *upstream_project_id)),
 446            WorktreeStoreState::Local { .. } => None,
 447        }
 448    }
 449
 450    pub fn set_worktrees_from_proto(
 451        &mut self,
 452        worktrees: Vec<proto::WorktreeMetadata>,
 453        replica_id: ReplicaId,
 454        cx: &mut Context<Self>,
 455    ) -> Result<()> {
 456        let mut old_worktrees_by_id = self
 457            .worktrees
 458            .drain(..)
 459            .filter_map(|worktree| {
 460                let worktree = worktree.upgrade()?;
 461                Some((worktree.read(cx).id(), worktree))
 462            })
 463            .collect::<HashMap<_, _>>();
 464
 465        let (client, project_id) = self.upstream_client().context("invalid project")?;
 466
 467        for worktree in worktrees {
 468            if let Some(old_worktree) =
 469                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
 470            {
 471                let push_strong_handle =
 472                    self.retain_worktrees || old_worktree.read(cx).is_visible();
 473                let handle = if push_strong_handle {
 474                    WorktreeHandle::Strong(old_worktree.clone())
 475                } else {
 476                    WorktreeHandle::Weak(old_worktree.downgrade())
 477                };
 478                self.worktrees.push(handle);
 479            } else {
 480                self.add(
 481                    &Worktree::remote(project_id, replica_id, worktree, client.clone(), cx),
 482                    cx,
 483                );
 484            }
 485        }
 486        self.send_project_updates(cx);
 487
 488        Ok(())
 489    }
 490
 491    pub fn move_worktree(
 492        &mut self,
 493        source: WorktreeId,
 494        destination: WorktreeId,
 495        cx: &mut Context<Self>,
 496    ) -> Result<()> {
 497        if source == destination {
 498            return Ok(());
 499        }
 500
 501        let mut source_index = None;
 502        let mut destination_index = None;
 503        for (i, worktree) in self.worktrees.iter().enumerate() {
 504            if let Some(worktree) = worktree.upgrade() {
 505                let worktree_id = worktree.read(cx).id();
 506                if worktree_id == source {
 507                    source_index = Some(i);
 508                    if destination_index.is_some() {
 509                        break;
 510                    }
 511                } else if worktree_id == destination {
 512                    destination_index = Some(i);
 513                    if source_index.is_some() {
 514                        break;
 515                    }
 516                }
 517            }
 518        }
 519
 520        let source_index =
 521            source_index.with_context(|| format!("Missing worktree for id {source}"))?;
 522        let destination_index =
 523            destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
 524
 525        if source_index == destination_index {
 526            return Ok(());
 527        }
 528
 529        let worktree_to_move = self.worktrees.remove(source_index);
 530        self.worktrees.insert(destination_index, worktree_to_move);
 531        self.worktrees_reordered = true;
 532        cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
 533        cx.notify();
 534        Ok(())
 535    }
 536
 537    pub fn disconnected_from_host(&mut self, cx: &mut App) {
 538        for worktree in &self.worktrees {
 539            if let Some(worktree) = worktree.upgrade() {
 540                worktree.update(cx, |worktree, _| {
 541                    if let Some(worktree) = worktree.as_remote_mut() {
 542                        worktree.disconnected_from_host();
 543                    }
 544                });
 545            }
 546        }
 547    }
 548
 549    pub fn send_project_updates(&mut self, cx: &mut Context<Self>) {
 550        let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
 551            return;
 552        };
 553
 554        let update = proto::UpdateProject {
 555            project_id,
 556            worktrees: self.worktree_metadata_protos(cx),
 557        };
 558
 559        // collab has bad concurrency guarantees, so we send requests in serial.
 560        let update_project = if downstream_client.is_via_collab() {
 561            Some(downstream_client.request(update))
 562        } else {
 563            downstream_client.send(update).log_err();
 564            None
 565        };
 566        cx.spawn(async move |this, cx| {
 567            if let Some(update_project) = update_project {
 568                update_project.await?;
 569            }
 570
 571            this.update(cx, |this, cx| {
 572                let worktrees = this.worktrees().collect::<Vec<_>>();
 573
 574                for worktree in worktrees {
 575                    worktree.update(cx, |worktree, cx| {
 576                        let client = downstream_client.clone();
 577                        worktree.observe_updates(project_id, cx, {
 578                            move |update| {
 579                                let client = client.clone();
 580                                async move {
 581                                    if client.is_via_collab() {
 582                                        client
 583                                            .request(update)
 584                                            .map(|result| result.log_err().is_some())
 585                                            .await
 586                                    } else {
 587                                        client.send(update).log_err().is_some()
 588                                    }
 589                                }
 590                            }
 591                        });
 592                    });
 593
 594                    cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
 595                }
 596
 597                anyhow::Ok(())
 598            })
 599        })
 600        .detach_and_log_err(cx);
 601    }
 602
 603    pub fn worktree_metadata_protos(&self, cx: &App) -> Vec<proto::WorktreeMetadata> {
 604        self.worktrees()
 605            .map(|worktree| {
 606                let worktree = worktree.read(cx);
 607                proto::WorktreeMetadata {
 608                    id: worktree.id().to_proto(),
 609                    root_name: worktree.root_name().into(),
 610                    visible: worktree.is_visible(),
 611                    abs_path: worktree.abs_path().to_proto(),
 612                }
 613            })
 614            .collect()
 615    }
 616
 617    pub fn shared(
 618        &mut self,
 619        remote_id: u64,
 620        downstream_client: AnyProtoClient,
 621        cx: &mut Context<Self>,
 622    ) {
 623        self.retain_worktrees = true;
 624        self.downstream_client = Some((downstream_client, remote_id));
 625
 626        // When shared, retain all worktrees
 627        for worktree_handle in self.worktrees.iter_mut() {
 628            match worktree_handle {
 629                WorktreeHandle::Strong(_) => {}
 630                WorktreeHandle::Weak(worktree) => {
 631                    if let Some(worktree) = worktree.upgrade() {
 632                        *worktree_handle = WorktreeHandle::Strong(worktree);
 633                    }
 634                }
 635            }
 636        }
 637        self.send_project_updates(cx);
 638    }
 639
 640    pub fn unshared(&mut self, cx: &mut Context<Self>) {
 641        self.retain_worktrees = false;
 642        self.downstream_client.take();
 643
 644        // When not shared, only retain the visible worktrees
 645        for worktree_handle in self.worktrees.iter_mut() {
 646            if let WorktreeHandle::Strong(worktree) = worktree_handle {
 647                let is_visible = worktree.update(cx, |worktree, _| {
 648                    worktree.stop_observing_updates();
 649                    worktree.is_visible()
 650                });
 651                if !is_visible {
 652                    *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
 653                }
 654            }
 655        }
 656    }
 657
 658    /// search over all worktrees and return buffers that *might* match the search.
 659    pub fn find_search_candidates(
 660        &self,
 661        query: SearchQuery,
 662        limit: usize,
 663        open_entries: HashSet<ProjectEntryId>,
 664        fs: Arc<dyn Fs>,
 665        cx: &Context<Self>,
 666    ) -> Receiver<ProjectPath> {
 667        let snapshots = self
 668            .visible_worktrees(cx)
 669            .filter_map(|tree| {
 670                let tree = tree.read(cx);
 671                Some((tree.snapshot(), tree.as_local()?.settings()))
 672            })
 673            .collect::<Vec<_>>();
 674
 675        let executor = cx.background_executor().clone();
 676
 677        // We want to return entries in the order they are in the worktrees, so we have one
 678        // thread that iterates over the worktrees (and ignored directories) as necessary,
 679        // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
 680        // channel.
 681        // We spawn a number of workers that take items from the filter channel and check the query
 682        // against the version of the file on disk.
 683        let (filter_tx, filter_rx) = smol::channel::bounded(64);
 684        let (output_tx, output_rx) = smol::channel::bounded(64);
 685        let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
 686
 687        let input = cx.background_spawn({
 688            let fs = fs.clone();
 689            let query = query.clone();
 690            async move {
 691                Self::find_candidate_paths(
 692                    fs,
 693                    snapshots,
 694                    open_entries,
 695                    query,
 696                    filter_tx,
 697                    output_tx,
 698                )
 699                .await
 700                .log_err();
 701            }
 702        });
 703        const MAX_CONCURRENT_FILE_SCANS: usize = 64;
 704        let filters = cx.background_spawn(async move {
 705            let fs = &fs;
 706            let query = &query;
 707            executor
 708                .scoped(move |scope| {
 709                    for _ in 0..MAX_CONCURRENT_FILE_SCANS {
 710                        let filter_rx = filter_rx.clone();
 711                        scope.spawn(async move {
 712                            Self::filter_paths(fs, filter_rx, query)
 713                                .await
 714                                .log_with_level(log::Level::Debug);
 715                        })
 716                    }
 717                })
 718                .await;
 719        });
 720        cx.background_spawn(async move {
 721            let mut matched = 0;
 722            while let Ok(mut receiver) = output_rx.recv().await {
 723                let Some(path) = receiver.next().await else {
 724                    continue;
 725                };
 726                let Ok(_) = matching_paths_tx.send(path).await else {
 727                    break;
 728                };
 729                matched += 1;
 730                if matched == limit {
 731                    break;
 732                }
 733            }
 734            drop(input);
 735            drop(filters);
 736        })
 737        .detach();
 738        matching_paths_rx
 739    }
 740
 741    fn scan_ignored_dir<'a>(
 742        fs: &'a Arc<dyn Fs>,
 743        snapshot: &'a worktree::Snapshot,
 744        path: &'a Path,
 745        query: &'a SearchQuery,
 746        filter_tx: &'a Sender<MatchingEntry>,
 747        output_tx: &'a Sender<oneshot::Receiver<ProjectPath>>,
 748    ) -> BoxFuture<'a, Result<()>> {
 749        async move {
 750            let abs_path = snapshot.abs_path().join(path);
 751            let Some(mut files) = fs
 752                .read_dir(&abs_path)
 753                .await
 754                .with_context(|| format!("listing ignored path {abs_path:?}"))
 755                .log_err()
 756            else {
 757                return Ok(());
 758            };
 759
 760            let mut results = Vec::new();
 761
 762            while let Some(Ok(file)) = files.next().await {
 763                let Some(metadata) = fs
 764                    .metadata(&file)
 765                    .await
 766                    .with_context(|| format!("fetching fs metadata for {abs_path:?}"))
 767                    .log_err()
 768                    .flatten()
 769                else {
 770                    continue;
 771                };
 772                if metadata.is_symlink || metadata.is_fifo {
 773                    continue;
 774                }
 775                results.push((
 776                    file.strip_prefix(snapshot.abs_path())?.to_path_buf(),
 777                    !metadata.is_dir,
 778                ))
 779            }
 780            results.sort_by(|(a_path, _), (b_path, _)| a_path.cmp(b_path));
 781            for (path, is_file) in results {
 782                if is_file {
 783                    if query.filters_path() {
 784                        let matched_path = if query.match_full_paths() {
 785                            let mut full_path = PathBuf::from(snapshot.root_name());
 786                            full_path.push(&path);
 787                            query.match_path(&full_path)
 788                        } else {
 789                            query.match_path(&path)
 790                        };
 791                        if !matched_path {
 792                            continue;
 793                        }
 794                    }
 795                    let (tx, rx) = oneshot::channel();
 796                    output_tx.send(rx).await?;
 797                    filter_tx
 798                        .send(MatchingEntry {
 799                            respond: tx,
 800                            worktree_path: snapshot.abs_path().clone(),
 801                            path: ProjectPath {
 802                                worktree_id: snapshot.id(),
 803                                path: Arc::from(path),
 804                            },
 805                        })
 806                        .await?;
 807                } else {
 808                    Self::scan_ignored_dir(fs, snapshot, &path, query, filter_tx, output_tx)
 809                        .await?;
 810                }
 811            }
 812            Ok(())
 813        }
 814        .boxed()
 815    }
 816
 817    async fn find_candidate_paths(
 818        fs: Arc<dyn Fs>,
 819        snapshots: Vec<(worktree::Snapshot, WorktreeSettings)>,
 820        open_entries: HashSet<ProjectEntryId>,
 821        query: SearchQuery,
 822        filter_tx: Sender<MatchingEntry>,
 823        output_tx: Sender<oneshot::Receiver<ProjectPath>>,
 824    ) -> Result<()> {
 825        for (snapshot, settings) in snapshots {
 826            for entry in snapshot.entries(query.include_ignored(), 0) {
 827                if entry.is_dir() && entry.is_ignored {
 828                    if !settings.is_path_excluded(&entry.path) {
 829                        Self::scan_ignored_dir(
 830                            &fs,
 831                            &snapshot,
 832                            &entry.path,
 833                            &query,
 834                            &filter_tx,
 835                            &output_tx,
 836                        )
 837                        .await?;
 838                    }
 839                    continue;
 840                }
 841
 842                if entry.is_fifo || !entry.is_file() {
 843                    continue;
 844                }
 845
 846                if query.filters_path() {
 847                    let matched_path = if query.match_full_paths() {
 848                        let mut full_path = PathBuf::from(snapshot.root_name());
 849                        full_path.push(&entry.path);
 850                        query.match_path(&full_path)
 851                    } else {
 852                        query.match_path(&entry.path)
 853                    };
 854                    if !matched_path {
 855                        continue;
 856                    }
 857                }
 858
 859                let (mut tx, rx) = oneshot::channel();
 860
 861                if open_entries.contains(&entry.id) {
 862                    tx.send(ProjectPath {
 863                        worktree_id: snapshot.id(),
 864                        path: entry.path.clone(),
 865                    })
 866                    .await?;
 867                } else {
 868                    filter_tx
 869                        .send(MatchingEntry {
 870                            respond: tx,
 871                            worktree_path: snapshot.abs_path().clone(),
 872                            path: ProjectPath {
 873                                worktree_id: snapshot.id(),
 874                                path: entry.path.clone(),
 875                            },
 876                        })
 877                        .await?;
 878                }
 879
 880                output_tx.send(rx).await?;
 881            }
 882        }
 883        Ok(())
 884    }
 885
 886    async fn filter_paths(
 887        fs: &Arc<dyn Fs>,
 888        input: Receiver<MatchingEntry>,
 889        query: &SearchQuery,
 890    ) -> Result<()> {
 891        let mut input = pin!(input);
 892        while let Some(mut entry) = input.next().await {
 893            let abs_path = entry.worktree_path.join(&entry.path.path);
 894            let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
 895                continue;
 896            };
 897
 898            let mut file = BufReader::new(file);
 899            let file_start = file.fill_buf()?;
 900
 901            if let Err(Some(starting_position)) =
 902                std::str::from_utf8(file_start).map_err(|e| e.error_len())
 903            {
 904                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
 905                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
 906                log::debug!(
 907                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
 908                );
 909                continue;
 910            }
 911
 912            if query.detect(file).unwrap_or(false) {
 913                entry.respond.send(entry.path).await?
 914            }
 915        }
 916
 917        Ok(())
 918    }
 919
 920    pub async fn handle_create_project_entry(
 921        this: Entity<Self>,
 922        envelope: TypedEnvelope<proto::CreateProjectEntry>,
 923        mut cx: AsyncApp,
 924    ) -> Result<proto::ProjectEntryResponse> {
 925        let worktree = this.update(&mut cx, |this, cx| {
 926            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
 927            this.worktree_for_id(worktree_id, cx)
 928                .context("worktree not found")
 929        })??;
 930        Worktree::handle_create_entry(worktree, envelope.payload, cx).await
 931    }
 932
 933    pub async fn handle_copy_project_entry(
 934        this: Entity<Self>,
 935        envelope: TypedEnvelope<proto::CopyProjectEntry>,
 936        mut cx: AsyncApp,
 937    ) -> Result<proto::ProjectEntryResponse> {
 938        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
 939        let worktree = this.update(&mut cx, |this, cx| {
 940            this.worktree_for_entry(entry_id, cx)
 941                .context("worktree not found")
 942        })??;
 943        Worktree::handle_copy_entry(worktree, envelope.payload, cx).await
 944    }
 945
 946    pub async fn handle_delete_project_entry(
 947        this: Entity<Self>,
 948        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
 949        mut cx: AsyncApp,
 950    ) -> Result<proto::ProjectEntryResponse> {
 951        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
 952        let worktree = this.update(&mut cx, |this, cx| {
 953            this.worktree_for_entry(entry_id, cx)
 954                .context("worktree not found")
 955        })??;
 956        Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
 957    }
 958
 959    pub async fn handle_expand_project_entry(
 960        this: Entity<Self>,
 961        envelope: TypedEnvelope<proto::ExpandProjectEntry>,
 962        mut cx: AsyncApp,
 963    ) -> Result<proto::ExpandProjectEntryResponse> {
 964        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
 965        let worktree = this
 966            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
 967            .context("invalid request")?;
 968        Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
 969    }
 970
 971    pub async fn handle_expand_all_for_project_entry(
 972        this: Entity<Self>,
 973        envelope: TypedEnvelope<proto::ExpandAllForProjectEntry>,
 974        mut cx: AsyncApp,
 975    ) -> Result<proto::ExpandAllForProjectEntryResponse> {
 976        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
 977        let worktree = this
 978            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
 979            .context("invalid request")?;
 980        Worktree::handle_expand_all_for_entry(worktree, envelope.payload, cx).await
 981    }
 982
 983    pub fn fs(&self) -> Option<Arc<dyn Fs>> {
 984        match &self.state {
 985            WorktreeStoreState::Local { fs } => Some(fs.clone()),
 986            WorktreeStoreState::Remote { .. } => None,
 987        }
 988    }
 989}
 990
 991#[derive(Clone, Debug)]
 992enum WorktreeHandle {
 993    Strong(Entity<Worktree>),
 994    Weak(WeakEntity<Worktree>),
 995}
 996
 997impl WorktreeHandle {
 998    fn upgrade(&self) -> Option<Entity<Worktree>> {
 999        match self {
1000            WorktreeHandle::Strong(handle) => Some(handle.clone()),
1001            WorktreeHandle::Weak(handle) => handle.upgrade(),
1002        }
1003    }
1004}