worktree_store.rs

  1use std::{
  2    io::{BufRead, BufReader},
  3    path::{Path, PathBuf},
  4    pin::pin,
  5    sync::{Arc, atomic::AtomicUsize},
  6};
  7
  8use anyhow::{Context as _, Result, anyhow};
  9use collections::{HashMap, HashSet};
 10use fs::Fs;
 11use futures::{
 12    FutureExt, SinkExt,
 13    future::{BoxFuture, Shared},
 14};
 15use gpui::{
 16    App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity,
 17};
 18use postage::oneshot;
 19use rpc::{
 20    AnyProtoClient, ErrorExt, TypedEnvelope,
 21    proto::{self, FromProto, SSH_PROJECT_ID, ToProto},
 22};
 23use smol::{
 24    channel::{Receiver, Sender},
 25    stream::StreamExt,
 26};
 27use text::ReplicaId;
 28use util::{
 29    ResultExt,
 30    paths::{PathStyle, RemotePathBuf, SanitizedPath},
 31};
 32use worktree::{
 33    Entry, ProjectEntryId, UpdatedEntriesSet, UpdatedGitRepositoriesSet, Worktree, WorktreeId,
 34    WorktreeSettings,
 35};
 36
 37use crate::{ProjectPath, search::SearchQuery};
 38
 39struct MatchingEntry {
 40    worktree_path: Arc<Path>,
 41    path: ProjectPath,
 42    respond: oneshot::Sender<ProjectPath>,
 43}
 44
 45enum WorktreeStoreState {
 46    Local {
 47        fs: Arc<dyn Fs>,
 48    },
 49    Remote {
 50        upstream_client: AnyProtoClient,
 51        upstream_project_id: u64,
 52        path_style: PathStyle,
 53    },
 54}
 55
 56pub struct WorktreeStore {
 57    next_entry_id: Arc<AtomicUsize>,
 58    downstream_client: Option<(AnyProtoClient, u64)>,
 59    retain_worktrees: bool,
 60    worktrees: Vec<WorktreeHandle>,
 61    worktrees_reordered: bool,
 62    #[allow(clippy::type_complexity)]
 63    loading_worktrees:
 64        HashMap<SanitizedPath, Shared<Task<Result<Entity<Worktree>, Arc<anyhow::Error>>>>>,
 65    state: WorktreeStoreState,
 66}
 67
 68#[derive(Debug)]
 69pub enum WorktreeStoreEvent {
 70    WorktreeAdded(Entity<Worktree>),
 71    WorktreeRemoved(EntityId, WorktreeId),
 72    WorktreeReleased(EntityId, WorktreeId),
 73    WorktreeOrderChanged,
 74    WorktreeUpdateSent(Entity<Worktree>),
 75    WorktreeUpdatedEntries(WorktreeId, UpdatedEntriesSet),
 76    WorktreeUpdatedGitRepositories(WorktreeId, UpdatedGitRepositoriesSet),
 77    WorktreeDeletedEntry(WorktreeId, ProjectEntryId),
 78}
 79
 80impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
 81
 82impl WorktreeStore {
 83    pub fn init(client: &AnyProtoClient) {
 84        client.add_entity_request_handler(Self::handle_create_project_entry);
 85        client.add_entity_request_handler(Self::handle_copy_project_entry);
 86        client.add_entity_request_handler(Self::handle_delete_project_entry);
 87        client.add_entity_request_handler(Self::handle_expand_project_entry);
 88        client.add_entity_request_handler(Self::handle_expand_all_for_project_entry);
 89    }
 90
 91    pub fn local(retain_worktrees: bool, fs: Arc<dyn Fs>) -> Self {
 92        Self {
 93            next_entry_id: Default::default(),
 94            loading_worktrees: Default::default(),
 95            downstream_client: None,
 96            worktrees: Vec::new(),
 97            worktrees_reordered: false,
 98            retain_worktrees,
 99            state: WorktreeStoreState::Local { fs },
100        }
101    }
102
103    pub fn remote(
104        retain_worktrees: bool,
105        upstream_client: AnyProtoClient,
106        upstream_project_id: u64,
107        path_style: PathStyle,
108    ) -> Self {
109        Self {
110            next_entry_id: Default::default(),
111            loading_worktrees: Default::default(),
112            downstream_client: None,
113            worktrees: Vec::new(),
114            worktrees_reordered: false,
115            retain_worktrees,
116            state: WorktreeStoreState::Remote {
117                upstream_client,
118                upstream_project_id,
119                path_style,
120            },
121        }
122    }
123
124    /// Iterates through all worktrees, including ones that don't appear in the project panel
125    pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Entity<Worktree>> {
126        self.worktrees
127            .iter()
128            .filter_map(move |worktree| worktree.upgrade())
129    }
130
131    /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
132    pub fn visible_worktrees<'a>(
133        &'a self,
134        cx: &'a App,
135    ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
136        self.worktrees()
137            .filter(|worktree| worktree.read(cx).is_visible())
138    }
139
140    pub fn worktree_for_id(&self, id: WorktreeId, cx: &App) -> Option<Entity<Worktree>> {
141        self.worktrees()
142            .find(|worktree| worktree.read(cx).id() == id)
143    }
144
145    pub fn worktree_for_entry(
146        &self,
147        entry_id: ProjectEntryId,
148        cx: &App,
149    ) -> Option<Entity<Worktree>> {
150        self.worktrees()
151            .find(|worktree| worktree.read(cx).contains_entry(entry_id))
152    }
153
154    pub fn find_worktree(
155        &self,
156        abs_path: impl Into<SanitizedPath>,
157        cx: &App,
158    ) -> Option<(Entity<Worktree>, PathBuf)> {
159        let abs_path: SanitizedPath = abs_path.into();
160        for tree in self.worktrees() {
161            if let Ok(relative_path) = abs_path.as_path().strip_prefix(tree.read(cx).abs_path()) {
162                return Some((tree.clone(), relative_path.into()));
163            }
164        }
165        None
166    }
167
168    pub fn absolutize(&self, project_path: &ProjectPath, cx: &App) -> Option<PathBuf> {
169        let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
170        worktree.read(cx).absolutize(&project_path.path).ok()
171    }
172
173    pub fn find_or_create_worktree(
174        &mut self,
175        abs_path: impl AsRef<Path>,
176        visible: bool,
177        cx: &mut Context<Self>,
178    ) -> Task<Result<(Entity<Worktree>, PathBuf)>> {
179        let abs_path = abs_path.as_ref();
180        if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
181            Task::ready(Ok((tree, relative_path)))
182        } else {
183            let worktree = self.create_worktree(abs_path, visible, cx);
184            cx.background_spawn(async move { Ok((worktree.await?, PathBuf::new())) })
185        }
186    }
187
188    pub fn entry_for_id<'a>(&'a self, entry_id: ProjectEntryId, cx: &'a App) -> Option<&'a Entry> {
189        self.worktrees()
190            .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
191    }
192
193    pub fn worktree_and_entry_for_id<'a>(
194        &'a self,
195        entry_id: ProjectEntryId,
196        cx: &'a App,
197    ) -> Option<(Entity<Worktree>, &'a Entry)> {
198        self.worktrees().find_map(|worktree| {
199            worktree
200                .read(cx)
201                .entry_for_id(entry_id)
202                .map(|e| (worktree.clone(), e))
203        })
204    }
205
206    pub fn entry_for_path(&self, path: &ProjectPath, cx: &App) -> Option<Entry> {
207        self.worktree_for_id(path.worktree_id, cx)?
208            .read(cx)
209            .entry_for_path(&path.path)
210            .cloned()
211    }
212
213    pub fn create_worktree(
214        &mut self,
215        abs_path: impl Into<SanitizedPath>,
216        visible: bool,
217        cx: &mut Context<Self>,
218    ) -> Task<Result<Entity<Worktree>>> {
219        let abs_path: SanitizedPath = abs_path.into();
220        if !self.loading_worktrees.contains_key(&abs_path) {
221            let task = match &self.state {
222                WorktreeStoreState::Remote {
223                    upstream_client,
224                    path_style,
225                    ..
226                } => {
227                    if upstream_client.is_via_collab() {
228                        Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
229                    } else {
230                        let abs_path =
231                            RemotePathBuf::new(abs_path.as_path().to_path_buf(), *path_style);
232                        self.create_ssh_worktree(upstream_client.clone(), abs_path, visible, cx)
233                    }
234                }
235                WorktreeStoreState::Local { fs } => {
236                    self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
237                }
238            };
239
240            self.loading_worktrees
241                .insert(abs_path.clone(), task.shared());
242        }
243        let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
244        cx.spawn(async move |this, cx| {
245            let result = task.await;
246            this.update(cx, |this, _| this.loading_worktrees.remove(&abs_path))
247                .ok();
248            match result {
249                Ok(worktree) => Ok(worktree),
250                Err(err) => Err((*err).cloned()),
251            }
252        })
253    }
254
255    fn create_ssh_worktree(
256        &mut self,
257        client: AnyProtoClient,
258        abs_path: RemotePathBuf,
259        visible: bool,
260        cx: &mut Context<Self>,
261    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
262        let path_style = abs_path.path_style();
263        let mut abs_path = abs_path.to_string();
264        // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
265        // in which case want to strip the leading the `/`.
266        // On the host-side, the `~` will get expanded.
267        // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
268        if abs_path.starts_with("/~") {
269            abs_path = abs_path[1..].to_string();
270        }
271        if abs_path.is_empty() {
272            abs_path = "~/".to_string();
273        }
274
275        cx.spawn(async move |this, cx| {
276            let this = this.upgrade().context("Dropped worktree store")?;
277
278            let path = RemotePathBuf::new(abs_path.into(), path_style);
279            let response = client
280                .request(proto::AddWorktree {
281                    project_id: SSH_PROJECT_ID,
282                    path: path.to_proto(),
283                    visible,
284                })
285                .await?;
286
287            if let Some(existing_worktree) = this.read_with(cx, |this, cx| {
288                this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
289            })? {
290                return Ok(existing_worktree);
291            }
292
293            let root_path_buf = PathBuf::from_proto(response.canonicalized_path.clone());
294            let root_name = root_path_buf
295                .file_name()
296                .map(|n| n.to_string_lossy().to_string())
297                .unwrap_or(root_path_buf.to_string_lossy().to_string());
298
299            let worktree = cx.update(|cx| {
300                Worktree::remote(
301                    SSH_PROJECT_ID,
302                    0,
303                    proto::WorktreeMetadata {
304                        id: response.worktree_id,
305                        root_name,
306                        visible,
307                        abs_path: response.canonicalized_path,
308                    },
309                    client,
310                    cx,
311                )
312            })?;
313
314            this.update(cx, |this, cx| {
315                this.add(&worktree, cx);
316            })?;
317            Ok(worktree)
318        })
319    }
320
321    fn create_local_worktree(
322        &mut self,
323        fs: Arc<dyn Fs>,
324        abs_path: impl Into<SanitizedPath>,
325        visible: bool,
326        cx: &mut Context<Self>,
327    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
328        let next_entry_id = self.next_entry_id.clone();
329        let path: SanitizedPath = abs_path.into();
330
331        cx.spawn(async move |this, cx| {
332            let worktree = Worktree::local(path.clone(), visible, fs, next_entry_id, cx).await;
333
334            let worktree = worktree?;
335
336            this.update(cx, |this, cx| this.add(&worktree, cx))?;
337
338            if visible {
339                cx.update(|cx| {
340                    cx.add_recent_document(path.as_path());
341                })
342                .log_err();
343            }
344
345            Ok(worktree)
346        })
347    }
348
349    pub fn add(&mut self, worktree: &Entity<Worktree>, cx: &mut Context<Self>) {
350        let worktree_id = worktree.read(cx).id();
351        debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
352
353        let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
354        let handle = if push_strong_handle {
355            WorktreeHandle::Strong(worktree.clone())
356        } else {
357            WorktreeHandle::Weak(worktree.downgrade())
358        };
359        if self.worktrees_reordered {
360            self.worktrees.push(handle);
361        } else {
362            let i = match self
363                .worktrees
364                .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
365                    other.upgrade().map(|worktree| worktree.read(cx).abs_path())
366                }) {
367                Ok(i) | Err(i) => i,
368            };
369            self.worktrees.insert(i, handle);
370        }
371
372        cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
373        self.send_project_updates(cx);
374
375        let handle_id = worktree.entity_id();
376        cx.subscribe(worktree, |_, worktree, event, cx| {
377            let worktree_id = worktree.read(cx).id();
378            match event {
379                worktree::Event::UpdatedEntries(changes) => {
380                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedEntries(
381                        worktree_id,
382                        changes.clone(),
383                    ));
384                }
385                worktree::Event::UpdatedGitRepositories(set) => {
386                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedGitRepositories(
387                        worktree_id,
388                        set.clone(),
389                    ));
390                }
391                worktree::Event::DeletedEntry(id) => {
392                    cx.emit(WorktreeStoreEvent::WorktreeDeletedEntry(worktree_id, *id))
393                }
394            }
395        })
396        .detach();
397        cx.observe_release(worktree, move |this, worktree, cx| {
398            cx.emit(WorktreeStoreEvent::WorktreeReleased(
399                handle_id,
400                worktree.id(),
401            ));
402            cx.emit(WorktreeStoreEvent::WorktreeRemoved(
403                handle_id,
404                worktree.id(),
405            ));
406            this.send_project_updates(cx);
407        })
408        .detach();
409    }
410
411    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut Context<Self>) {
412        self.worktrees.retain(|worktree| {
413            if let Some(worktree) = worktree.upgrade() {
414                if worktree.read(cx).id() == id_to_remove {
415                    cx.emit(WorktreeStoreEvent::WorktreeRemoved(
416                        worktree.entity_id(),
417                        id_to_remove,
418                    ));
419                    false
420                } else {
421                    true
422                }
423            } else {
424                false
425            }
426        });
427        self.send_project_updates(cx);
428    }
429
430    pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
431        self.worktrees_reordered = worktrees_reordered;
432    }
433
434    fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
435        match &self.state {
436            WorktreeStoreState::Remote {
437                upstream_client,
438                upstream_project_id,
439                ..
440            } => Some((upstream_client.clone(), *upstream_project_id)),
441            WorktreeStoreState::Local { .. } => None,
442        }
443    }
444
445    pub fn set_worktrees_from_proto(
446        &mut self,
447        worktrees: Vec<proto::WorktreeMetadata>,
448        replica_id: ReplicaId,
449        cx: &mut Context<Self>,
450    ) -> Result<()> {
451        let mut old_worktrees_by_id = self
452            .worktrees
453            .drain(..)
454            .filter_map(|worktree| {
455                let worktree = worktree.upgrade()?;
456                Some((worktree.read(cx).id(), worktree))
457            })
458            .collect::<HashMap<_, _>>();
459
460        let (client, project_id) = self.upstream_client().context("invalid project")?;
461
462        for worktree in worktrees {
463            if let Some(old_worktree) =
464                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
465            {
466                let push_strong_handle =
467                    self.retain_worktrees || old_worktree.read(cx).is_visible();
468                let handle = if push_strong_handle {
469                    WorktreeHandle::Strong(old_worktree.clone())
470                } else {
471                    WorktreeHandle::Weak(old_worktree.downgrade())
472                };
473                self.worktrees.push(handle);
474            } else {
475                self.add(
476                    &Worktree::remote(project_id, replica_id, worktree, client.clone(), cx),
477                    cx,
478                );
479            }
480        }
481        self.send_project_updates(cx);
482
483        Ok(())
484    }
485
486    pub fn move_worktree(
487        &mut self,
488        source: WorktreeId,
489        destination: WorktreeId,
490        cx: &mut Context<Self>,
491    ) -> Result<()> {
492        if source == destination {
493            return Ok(());
494        }
495
496        let mut source_index = None;
497        let mut destination_index = None;
498        for (i, worktree) in self.worktrees.iter().enumerate() {
499            if let Some(worktree) = worktree.upgrade() {
500                let worktree_id = worktree.read(cx).id();
501                if worktree_id == source {
502                    source_index = Some(i);
503                    if destination_index.is_some() {
504                        break;
505                    }
506                } else if worktree_id == destination {
507                    destination_index = Some(i);
508                    if source_index.is_some() {
509                        break;
510                    }
511                }
512            }
513        }
514
515        let source_index =
516            source_index.with_context(|| format!("Missing worktree for id {source}"))?;
517        let destination_index =
518            destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
519
520        if source_index == destination_index {
521            return Ok(());
522        }
523
524        let worktree_to_move = self.worktrees.remove(source_index);
525        self.worktrees.insert(destination_index, worktree_to_move);
526        self.worktrees_reordered = true;
527        cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
528        cx.notify();
529        Ok(())
530    }
531
532    pub fn disconnected_from_host(&mut self, cx: &mut App) {
533        for worktree in &self.worktrees {
534            if let Some(worktree) = worktree.upgrade() {
535                worktree.update(cx, |worktree, _| {
536                    if let Some(worktree) = worktree.as_remote_mut() {
537                        worktree.disconnected_from_host();
538                    }
539                });
540            }
541        }
542    }
543
544    pub fn send_project_updates(&mut self, cx: &mut Context<Self>) {
545        let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
546            return;
547        };
548
549        let update = proto::UpdateProject {
550            project_id,
551            worktrees: self.worktree_metadata_protos(cx),
552        };
553
554        // collab has bad concurrency guarantees, so we send requests in serial.
555        let update_project = if downstream_client.is_via_collab() {
556            Some(downstream_client.request(update))
557        } else {
558            downstream_client.send(update).log_err();
559            None
560        };
561        cx.spawn(async move |this, cx| {
562            if let Some(update_project) = update_project {
563                update_project.await?;
564            }
565
566            this.update(cx, |this, cx| {
567                let worktrees = this.worktrees().collect::<Vec<_>>();
568
569                for worktree in worktrees {
570                    worktree.update(cx, |worktree, cx| {
571                        let client = downstream_client.clone();
572                        worktree.observe_updates(project_id, cx, {
573                            move |update| {
574                                let client = client.clone();
575                                async move {
576                                    if client.is_via_collab() {
577                                        client
578                                            .request(update)
579                                            .map(|result| result.log_err().is_some())
580                                            .await
581                                    } else {
582                                        client.send(update).log_err().is_some()
583                                    }
584                                }
585                            }
586                        });
587                    });
588
589                    cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
590                }
591
592                anyhow::Ok(())
593            })
594        })
595        .detach_and_log_err(cx);
596    }
597
598    pub fn worktree_metadata_protos(&self, cx: &App) -> Vec<proto::WorktreeMetadata> {
599        self.worktrees()
600            .map(|worktree| {
601                let worktree = worktree.read(cx);
602                proto::WorktreeMetadata {
603                    id: worktree.id().to_proto(),
604                    root_name: worktree.root_name().into(),
605                    visible: worktree.is_visible(),
606                    abs_path: worktree.abs_path().to_proto(),
607                }
608            })
609            .collect()
610    }
611
612    pub fn shared(
613        &mut self,
614        remote_id: u64,
615        downstream_client: AnyProtoClient,
616        cx: &mut Context<Self>,
617    ) {
618        self.retain_worktrees = true;
619        self.downstream_client = Some((downstream_client, remote_id));
620
621        // When shared, retain all worktrees
622        for worktree_handle in self.worktrees.iter_mut() {
623            match worktree_handle {
624                WorktreeHandle::Strong(_) => {}
625                WorktreeHandle::Weak(worktree) => {
626                    if let Some(worktree) = worktree.upgrade() {
627                        *worktree_handle = WorktreeHandle::Strong(worktree);
628                    }
629                }
630            }
631        }
632        self.send_project_updates(cx);
633    }
634
635    pub fn unshared(&mut self, cx: &mut Context<Self>) {
636        self.retain_worktrees = false;
637        self.downstream_client.take();
638
639        // When not shared, only retain the visible worktrees
640        for worktree_handle in self.worktrees.iter_mut() {
641            if let WorktreeHandle::Strong(worktree) = worktree_handle {
642                let is_visible = worktree.update(cx, |worktree, _| {
643                    worktree.stop_observing_updates();
644                    worktree.is_visible()
645                });
646                if !is_visible {
647                    *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
648                }
649            }
650        }
651    }
652
653    /// search over all worktrees and return buffers that *might* match the search.
654    pub fn find_search_candidates(
655        &self,
656        query: SearchQuery,
657        limit: usize,
658        open_entries: HashSet<ProjectEntryId>,
659        fs: Arc<dyn Fs>,
660        cx: &Context<Self>,
661    ) -> Receiver<ProjectPath> {
662        let snapshots = self
663            .visible_worktrees(cx)
664            .filter_map(|tree| {
665                let tree = tree.read(cx);
666                Some((tree.snapshot(), tree.as_local()?.settings()))
667            })
668            .collect::<Vec<_>>();
669
670        let executor = cx.background_executor().clone();
671
672        // We want to return entries in the order they are in the worktrees, so we have one
673        // thread that iterates over the worktrees (and ignored directories) as necessary,
674        // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
675        // channel.
676        // We spawn a number of workers that take items from the filter channel and check the query
677        // against the version of the file on disk.
678        let (filter_tx, filter_rx) = smol::channel::bounded(64);
679        let (output_tx, output_rx) = smol::channel::bounded(64);
680        let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
681
682        let input = cx.background_spawn({
683            let fs = fs.clone();
684            let query = query.clone();
685            async move {
686                Self::find_candidate_paths(
687                    fs,
688                    snapshots,
689                    open_entries,
690                    query,
691                    filter_tx,
692                    output_tx,
693                )
694                .await
695                .log_err();
696            }
697        });
698        const MAX_CONCURRENT_FILE_SCANS: usize = 64;
699        let filters = cx.background_spawn(async move {
700            let fs = &fs;
701            let query = &query;
702            executor
703                .scoped(move |scope| {
704                    for _ in 0..MAX_CONCURRENT_FILE_SCANS {
705                        let filter_rx = filter_rx.clone();
706                        scope.spawn(async move {
707                            Self::filter_paths(fs, filter_rx, query)
708                                .await
709                                .log_with_level(log::Level::Debug);
710                        })
711                    }
712                })
713                .await;
714        });
715        cx.background_spawn(async move {
716            let mut matched = 0;
717            while let Ok(mut receiver) = output_rx.recv().await {
718                let Some(path) = receiver.next().await else {
719                    continue;
720                };
721                let Ok(_) = matching_paths_tx.send(path).await else {
722                    break;
723                };
724                matched += 1;
725                if matched == limit {
726                    break;
727                }
728            }
729            drop(input);
730            drop(filters);
731        })
732        .detach();
733        matching_paths_rx
734    }
735
736    fn scan_ignored_dir<'a>(
737        fs: &'a Arc<dyn Fs>,
738        snapshot: &'a worktree::Snapshot,
739        path: &'a Path,
740        query: &'a SearchQuery,
741        filter_tx: &'a Sender<MatchingEntry>,
742        output_tx: &'a Sender<oneshot::Receiver<ProjectPath>>,
743    ) -> BoxFuture<'a, Result<()>> {
744        async move {
745            let abs_path = snapshot.abs_path().join(path);
746            let Some(mut files) = fs
747                .read_dir(&abs_path)
748                .await
749                .with_context(|| format!("listing ignored path {abs_path:?}"))
750                .log_err()
751            else {
752                return Ok(());
753            };
754
755            let mut results = Vec::new();
756
757            while let Some(Ok(file)) = files.next().await {
758                let Some(metadata) = fs
759                    .metadata(&file)
760                    .await
761                    .with_context(|| format!("fetching fs metadata for {abs_path:?}"))
762                    .log_err()
763                    .flatten()
764                else {
765                    continue;
766                };
767                if metadata.is_symlink || metadata.is_fifo {
768                    continue;
769                }
770                results.push((
771                    file.strip_prefix(snapshot.abs_path())?.to_path_buf(),
772                    !metadata.is_dir,
773                ))
774            }
775            results.sort_by(|(a_path, _), (b_path, _)| a_path.cmp(b_path));
776            for (path, is_file) in results {
777                if is_file {
778                    if query.filters_path() {
779                        let matched_path = if query.match_full_paths() {
780                            let mut full_path = PathBuf::from(snapshot.root_name());
781                            full_path.push(&path);
782                            query.match_path(&full_path)
783                        } else {
784                            query.match_path(&path)
785                        };
786                        if !matched_path {
787                            continue;
788                        }
789                    }
790                    let (tx, rx) = oneshot::channel();
791                    output_tx.send(rx).await?;
792                    filter_tx
793                        .send(MatchingEntry {
794                            respond: tx,
795                            worktree_path: snapshot.abs_path().clone(),
796                            path: ProjectPath {
797                                worktree_id: snapshot.id(),
798                                path: Arc::from(path),
799                            },
800                        })
801                        .await?;
802                } else {
803                    Self::scan_ignored_dir(fs, snapshot, &path, query, filter_tx, output_tx)
804                        .await?;
805                }
806            }
807            Ok(())
808        }
809        .boxed()
810    }
811
812    async fn find_candidate_paths(
813        fs: Arc<dyn Fs>,
814        snapshots: Vec<(worktree::Snapshot, WorktreeSettings)>,
815        open_entries: HashSet<ProjectEntryId>,
816        query: SearchQuery,
817        filter_tx: Sender<MatchingEntry>,
818        output_tx: Sender<oneshot::Receiver<ProjectPath>>,
819    ) -> Result<()> {
820        for (snapshot, settings) in snapshots {
821            for entry in snapshot.entries(query.include_ignored(), 0) {
822                if entry.is_dir() && entry.is_ignored {
823                    if !settings.is_path_excluded(&entry.path) {
824                        Self::scan_ignored_dir(
825                            &fs,
826                            &snapshot,
827                            &entry.path,
828                            &query,
829                            &filter_tx,
830                            &output_tx,
831                        )
832                        .await?;
833                    }
834                    continue;
835                }
836
837                if entry.is_fifo || !entry.is_file() {
838                    continue;
839                }
840
841                if query.filters_path() {
842                    let matched_path = if query.match_full_paths() {
843                        let mut full_path = PathBuf::from(snapshot.root_name());
844                        full_path.push(&entry.path);
845                        query.match_path(&full_path)
846                    } else {
847                        query.match_path(&entry.path)
848                    };
849                    if !matched_path {
850                        continue;
851                    }
852                }
853
854                let (mut tx, rx) = oneshot::channel();
855
856                if open_entries.contains(&entry.id) {
857                    tx.send(ProjectPath {
858                        worktree_id: snapshot.id(),
859                        path: entry.path.clone(),
860                    })
861                    .await?;
862                } else {
863                    filter_tx
864                        .send(MatchingEntry {
865                            respond: tx,
866                            worktree_path: snapshot.abs_path().clone(),
867                            path: ProjectPath {
868                                worktree_id: snapshot.id(),
869                                path: entry.path.clone(),
870                            },
871                        })
872                        .await?;
873                }
874
875                output_tx.send(rx).await?;
876            }
877        }
878        Ok(())
879    }
880
881    async fn filter_paths(
882        fs: &Arc<dyn Fs>,
883        input: Receiver<MatchingEntry>,
884        query: &SearchQuery,
885    ) -> Result<()> {
886        let mut input = pin!(input);
887        while let Some(mut entry) = input.next().await {
888            let abs_path = entry.worktree_path.join(&entry.path.path);
889            let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
890                continue;
891            };
892
893            let mut file = BufReader::new(file);
894            let file_start = file.fill_buf()?;
895
896            if let Err(Some(starting_position)) =
897                std::str::from_utf8(file_start).map_err(|e| e.error_len())
898            {
899                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
900                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
901                log::debug!(
902                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
903                );
904                continue;
905            }
906
907            if query.detect(file).unwrap_or(false) {
908                entry.respond.send(entry.path).await?
909            }
910        }
911
912        Ok(())
913    }
914
915    pub async fn handle_create_project_entry(
916        this: Entity<Self>,
917        envelope: TypedEnvelope<proto::CreateProjectEntry>,
918        mut cx: AsyncApp,
919    ) -> Result<proto::ProjectEntryResponse> {
920        let worktree = this.update(&mut cx, |this, cx| {
921            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
922            this.worktree_for_id(worktree_id, cx)
923                .context("worktree not found")
924        })??;
925        Worktree::handle_create_entry(worktree, envelope.payload, cx).await
926    }
927
928    pub async fn handle_copy_project_entry(
929        this: Entity<Self>,
930        envelope: TypedEnvelope<proto::CopyProjectEntry>,
931        mut cx: AsyncApp,
932    ) -> Result<proto::ProjectEntryResponse> {
933        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
934        let worktree = this.update(&mut cx, |this, cx| {
935            this.worktree_for_entry(entry_id, cx)
936                .context("worktree not found")
937        })??;
938        Worktree::handle_copy_entry(worktree, envelope.payload, cx).await
939    }
940
941    pub async fn handle_delete_project_entry(
942        this: Entity<Self>,
943        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
944        mut cx: AsyncApp,
945    ) -> Result<proto::ProjectEntryResponse> {
946        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
947        let worktree = this.update(&mut cx, |this, cx| {
948            this.worktree_for_entry(entry_id, cx)
949                .context("worktree not found")
950        })??;
951        Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
952    }
953
954    pub async fn handle_expand_project_entry(
955        this: Entity<Self>,
956        envelope: TypedEnvelope<proto::ExpandProjectEntry>,
957        mut cx: AsyncApp,
958    ) -> Result<proto::ExpandProjectEntryResponse> {
959        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
960        let worktree = this
961            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
962            .context("invalid request")?;
963        Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
964    }
965
966    pub async fn handle_expand_all_for_project_entry(
967        this: Entity<Self>,
968        envelope: TypedEnvelope<proto::ExpandAllForProjectEntry>,
969        mut cx: AsyncApp,
970    ) -> Result<proto::ExpandAllForProjectEntryResponse> {
971        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
972        let worktree = this
973            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
974            .context("invalid request")?;
975        Worktree::handle_expand_all_for_entry(worktree, envelope.payload, cx).await
976    }
977
978    pub fn fs(&self) -> Option<Arc<dyn Fs>> {
979        match &self.state {
980            WorktreeStoreState::Local { fs } => Some(fs.clone()),
981            WorktreeStoreState::Remote { .. } => None,
982        }
983    }
984}
985
986#[derive(Clone, Debug)]
987enum WorktreeHandle {
988    Strong(Entity<Worktree>),
989    Weak(WeakEntity<Worktree>),
990}
991
992impl WorktreeHandle {
993    fn upgrade(&self) -> Option<Entity<Worktree>> {
994        match self {
995            WorktreeHandle::Strong(handle) => Some(handle.clone()),
996            WorktreeHandle::Weak(handle) => handle.upgrade(),
997        }
998    }
999}