worktree_store.rs

  1use std::{
  2    io::{BufRead, BufReader},
  3    path::{Path, PathBuf},
  4    pin::pin,
  5    sync::{Arc, atomic::AtomicUsize},
  6};
  7
  8use anyhow::{Context as _, Result, anyhow};
  9use collections::{HashMap, HashSet};
 10use fs::Fs;
 11use futures::{
 12    FutureExt, SinkExt,
 13    future::{BoxFuture, Shared},
 14};
 15use gpui::{
 16    App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity,
 17};
 18use postage::oneshot;
 19use rpc::{
 20    AnyProtoClient, ErrorExt, TypedEnvelope,
 21    proto::{self, FromProto, SSH_PROJECT_ID, ToProto},
 22};
 23use smol::{
 24    channel::{Receiver, Sender},
 25    stream::StreamExt,
 26};
 27use text::ReplicaId;
 28use util::{
 29    ResultExt,
 30    paths::{PathStyle, RemotePathBuf, SanitizedPath},
 31};
 32use worktree::{
 33    Entry, ProjectEntryId, UpdatedEntriesSet, UpdatedGitRepositoriesSet, Worktree, WorktreeId,
 34    WorktreeSettings,
 35};
 36
 37use crate::{ProjectPath, search::SearchQuery};
 38
 39struct MatchingEntry {
 40    worktree_path: Arc<Path>,
 41    path: ProjectPath,
 42    respond: oneshot::Sender<ProjectPath>,
 43}
 44
 45enum WorktreeStoreState {
 46    Local {
 47        fs: Arc<dyn Fs>,
 48    },
 49    Remote {
 50        upstream_client: AnyProtoClient,
 51        upstream_project_id: u64,
 52        path_style: PathStyle,
 53    },
 54}
 55
 56pub struct WorktreeStore {
 57    next_entry_id: Arc<AtomicUsize>,
 58    downstream_client: Option<(AnyProtoClient, u64)>,
 59    retain_worktrees: bool,
 60    worktrees: Vec<WorktreeHandle>,
 61    worktrees_reordered: bool,
 62    #[allow(clippy::type_complexity)]
 63    loading_worktrees:
 64        HashMap<SanitizedPath, Shared<Task<Result<Entity<Worktree>, Arc<anyhow::Error>>>>>,
 65    state: WorktreeStoreState,
 66}
 67
 68#[derive(Debug)]
 69pub enum WorktreeStoreEvent {
 70    WorktreeAdded(Entity<Worktree>),
 71    WorktreeRemoved(EntityId, WorktreeId),
 72    WorktreeReleased(EntityId, WorktreeId),
 73    WorktreeOrderChanged,
 74    WorktreeUpdateSent(Entity<Worktree>),
 75    WorktreeUpdatedEntries(WorktreeId, UpdatedEntriesSet),
 76    WorktreeUpdatedGitRepositories(WorktreeId, UpdatedGitRepositoriesSet),
 77    WorktreeDeletedEntry(WorktreeId, ProjectEntryId),
 78}
 79
 80impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
 81
 82impl WorktreeStore {
 83    pub fn init(client: &AnyProtoClient) {
 84        client.add_entity_request_handler(Self::handle_create_project_entry);
 85        client.add_entity_request_handler(Self::handle_copy_project_entry);
 86        client.add_entity_request_handler(Self::handle_delete_project_entry);
 87        client.add_entity_request_handler(Self::handle_expand_project_entry);
 88        client.add_entity_request_handler(Self::handle_expand_all_for_project_entry);
 89    }
 90
 91    pub fn local(retain_worktrees: bool, fs: Arc<dyn Fs>) -> Self {
 92        Self {
 93            next_entry_id: Default::default(),
 94            loading_worktrees: Default::default(),
 95            downstream_client: None,
 96            worktrees: Vec::new(),
 97            worktrees_reordered: false,
 98            retain_worktrees,
 99            state: WorktreeStoreState::Local { fs },
100        }
101    }
102
103    pub fn remote(
104        retain_worktrees: bool,
105        upstream_client: AnyProtoClient,
106        upstream_project_id: u64,
107        path_style: PathStyle,
108    ) -> Self {
109        Self {
110            next_entry_id: Default::default(),
111            loading_worktrees: Default::default(),
112            downstream_client: None,
113            worktrees: Vec::new(),
114            worktrees_reordered: false,
115            retain_worktrees,
116            state: WorktreeStoreState::Remote {
117                upstream_client,
118                upstream_project_id,
119                path_style,
120            },
121        }
122    }
123
124    /// Iterates through all worktrees, including ones that don't appear in the project panel
125    pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Entity<Worktree>> {
126        self.worktrees
127            .iter()
128            .filter_map(move |worktree| worktree.upgrade())
129    }
130
131    /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
132    pub fn visible_worktrees<'a>(
133        &'a self,
134        cx: &'a App,
135    ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
136        self.worktrees()
137            .filter(|worktree| worktree.read(cx).is_visible())
138    }
139
140    pub fn worktree_for_id(&self, id: WorktreeId, cx: &App) -> Option<Entity<Worktree>> {
141        self.worktrees()
142            .find(|worktree| worktree.read(cx).id() == id)
143    }
144
145    pub fn worktree_for_entry(
146        &self,
147        entry_id: ProjectEntryId,
148        cx: &App,
149    ) -> Option<Entity<Worktree>> {
150        self.worktrees()
151            .find(|worktree| worktree.read(cx).contains_entry(entry_id))
152    }
153
154    pub fn find_worktree(
155        &self,
156        abs_path: impl Into<SanitizedPath>,
157        cx: &App,
158    ) -> Option<(Entity<Worktree>, PathBuf)> {
159        let abs_path: SanitizedPath = abs_path.into();
160        for tree in self.worktrees() {
161            if let Ok(relative_path) = abs_path.as_path().strip_prefix(tree.read(cx).abs_path()) {
162                return Some((tree.clone(), relative_path.into()));
163            }
164        }
165        None
166    }
167
168    pub fn absolutize(&self, project_path: &ProjectPath, cx: &App) -> Option<PathBuf> {
169        let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
170        worktree.read(cx).absolutize(&project_path.path).ok()
171    }
172
173    pub fn find_or_create_worktree(
174        &mut self,
175        abs_path: impl AsRef<Path>,
176        visible: bool,
177        cx: &mut Context<Self>,
178    ) -> Task<Result<(Entity<Worktree>, PathBuf)>> {
179        let abs_path = abs_path.as_ref();
180        if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
181            Task::ready(Ok((tree, relative_path)))
182        } else {
183            let worktree = self.create_worktree(abs_path, visible, cx);
184            cx.background_spawn(async move { Ok((worktree.await?, PathBuf::new())) })
185        }
186    }
187
188    pub fn entry_for_id<'a>(&'a self, entry_id: ProjectEntryId, cx: &'a App) -> Option<&'a Entry> {
189        self.worktrees()
190            .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
191    }
192
193    pub fn worktree_and_entry_for_id<'a>(
194        &'a self,
195        entry_id: ProjectEntryId,
196        cx: &'a App,
197    ) -> Option<(Entity<Worktree>, &'a Entry)> {
198        self.worktrees().find_map(|worktree| {
199            worktree
200                .read(cx)
201                .entry_for_id(entry_id)
202                .map(|e| (worktree.clone(), e))
203        })
204    }
205
206    pub fn entry_for_path<'a>(&'a self, path: &ProjectPath, cx: &'a App) -> Option<&'a Entry> {
207        self.worktree_for_id(path.worktree_id, cx)?
208            .read(cx)
209            .entry_for_path(&path.path)
210    }
211
212    pub fn create_worktree(
213        &mut self,
214        abs_path: impl Into<SanitizedPath>,
215        visible: bool,
216        cx: &mut Context<Self>,
217    ) -> Task<Result<Entity<Worktree>>> {
218        let abs_path: SanitizedPath = abs_path.into();
219        if !self.loading_worktrees.contains_key(&abs_path) {
220            let task = match &self.state {
221                WorktreeStoreState::Remote {
222                    upstream_client,
223                    path_style,
224                    ..
225                } => {
226                    if upstream_client.is_via_collab() {
227                        Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
228                    } else {
229                        let abs_path =
230                            RemotePathBuf::new(abs_path.as_path().to_path_buf(), *path_style);
231                        self.create_ssh_worktree(upstream_client.clone(), abs_path, visible, cx)
232                    }
233                }
234                WorktreeStoreState::Local { fs } => {
235                    self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
236                }
237            };
238
239            self.loading_worktrees
240                .insert(abs_path.clone(), task.shared());
241        }
242        let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
243        cx.spawn(async move |this, cx| {
244            let result = task.await;
245            this.update(cx, |this, _| this.loading_worktrees.remove(&abs_path))
246                .ok();
247            match result {
248                Ok(worktree) => Ok(worktree),
249                Err(err) => Err((*err).cloned()),
250            }
251        })
252    }
253
254    fn create_ssh_worktree(
255        &mut self,
256        client: AnyProtoClient,
257        abs_path: RemotePathBuf,
258        visible: bool,
259        cx: &mut Context<Self>,
260    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
261        let path_style = abs_path.path_style();
262        let mut abs_path = abs_path.to_string();
263        // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
264        // in which case want to strip the leading the `/`.
265        // On the host-side, the `~` will get expanded.
266        // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
267        if abs_path.starts_with("/~") {
268            abs_path = abs_path[1..].to_string();
269        }
270        if abs_path.is_empty() {
271            abs_path = "~/".to_string();
272        }
273
274        cx.spawn(async move |this, cx| {
275            let this = this.upgrade().context("Dropped worktree store")?;
276
277            let path = RemotePathBuf::new(abs_path.into(), path_style);
278            let response = client
279                .request(proto::AddWorktree {
280                    project_id: SSH_PROJECT_ID,
281                    path: path.to_proto(),
282                    visible,
283                })
284                .await?;
285
286            if let Some(existing_worktree) = this.read_with(cx, |this, cx| {
287                this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
288            })? {
289                return Ok(existing_worktree);
290            }
291
292            let root_path_buf = PathBuf::from_proto(response.canonicalized_path.clone());
293            let root_name = root_path_buf
294                .file_name()
295                .map(|n| n.to_string_lossy().to_string())
296                .unwrap_or(root_path_buf.to_string_lossy().to_string());
297
298            let worktree = cx.update(|cx| {
299                Worktree::remote(
300                    SSH_PROJECT_ID,
301                    0,
302                    proto::WorktreeMetadata {
303                        id: response.worktree_id,
304                        root_name,
305                        visible,
306                        abs_path: response.canonicalized_path,
307                    },
308                    client,
309                    cx,
310                )
311            })?;
312
313            this.update(cx, |this, cx| {
314                this.add(&worktree, cx);
315            })?;
316            Ok(worktree)
317        })
318    }
319
320    fn create_local_worktree(
321        &mut self,
322        fs: Arc<dyn Fs>,
323        abs_path: impl Into<SanitizedPath>,
324        visible: bool,
325        cx: &mut Context<Self>,
326    ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
327        let next_entry_id = self.next_entry_id.clone();
328        let path: SanitizedPath = abs_path.into();
329
330        cx.spawn(async move |this, cx| {
331            let worktree = Worktree::local(path.clone(), visible, fs, next_entry_id, cx).await;
332
333            let worktree = worktree?;
334
335            this.update(cx, |this, cx| this.add(&worktree, cx))?;
336
337            if visible {
338                cx.update(|cx| {
339                    cx.add_recent_document(path.as_path());
340                })
341                .log_err();
342            }
343
344            Ok(worktree)
345        })
346    }
347
348    pub fn add(&mut self, worktree: &Entity<Worktree>, cx: &mut Context<Self>) {
349        let worktree_id = worktree.read(cx).id();
350        debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
351
352        let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
353        let handle = if push_strong_handle {
354            WorktreeHandle::Strong(worktree.clone())
355        } else {
356            WorktreeHandle::Weak(worktree.downgrade())
357        };
358        if self.worktrees_reordered {
359            self.worktrees.push(handle);
360        } else {
361            let i = match self
362                .worktrees
363                .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
364                    other.upgrade().map(|worktree| worktree.read(cx).abs_path())
365                }) {
366                Ok(i) | Err(i) => i,
367            };
368            self.worktrees.insert(i, handle);
369        }
370
371        cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
372        self.send_project_updates(cx);
373
374        let handle_id = worktree.entity_id();
375        cx.subscribe(worktree, |_, worktree, event, cx| {
376            let worktree_id = worktree.read(cx).id();
377            match event {
378                worktree::Event::UpdatedEntries(changes) => {
379                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedEntries(
380                        worktree_id,
381                        changes.clone(),
382                    ));
383                }
384                worktree::Event::UpdatedGitRepositories(set) => {
385                    cx.emit(WorktreeStoreEvent::WorktreeUpdatedGitRepositories(
386                        worktree_id,
387                        set.clone(),
388                    ));
389                }
390                worktree::Event::DeletedEntry(id) => {
391                    cx.emit(WorktreeStoreEvent::WorktreeDeletedEntry(worktree_id, *id))
392                }
393            }
394        })
395        .detach();
396        cx.observe_release(worktree, move |this, worktree, cx| {
397            cx.emit(WorktreeStoreEvent::WorktreeReleased(
398                handle_id,
399                worktree.id(),
400            ));
401            cx.emit(WorktreeStoreEvent::WorktreeRemoved(
402                handle_id,
403                worktree.id(),
404            ));
405            this.send_project_updates(cx);
406        })
407        .detach();
408    }
409
410    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut Context<Self>) {
411        self.worktrees.retain(|worktree| {
412            if let Some(worktree) = worktree.upgrade() {
413                if worktree.read(cx).id() == id_to_remove {
414                    cx.emit(WorktreeStoreEvent::WorktreeRemoved(
415                        worktree.entity_id(),
416                        id_to_remove,
417                    ));
418                    false
419                } else {
420                    true
421                }
422            } else {
423                false
424            }
425        });
426        self.send_project_updates(cx);
427    }
428
429    pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
430        self.worktrees_reordered = worktrees_reordered;
431    }
432
433    fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
434        match &self.state {
435            WorktreeStoreState::Remote {
436                upstream_client,
437                upstream_project_id,
438                ..
439            } => Some((upstream_client.clone(), *upstream_project_id)),
440            WorktreeStoreState::Local { .. } => None,
441        }
442    }
443
444    pub fn set_worktrees_from_proto(
445        &mut self,
446        worktrees: Vec<proto::WorktreeMetadata>,
447        replica_id: ReplicaId,
448        cx: &mut Context<Self>,
449    ) -> Result<()> {
450        let mut old_worktrees_by_id = self
451            .worktrees
452            .drain(..)
453            .filter_map(|worktree| {
454                let worktree = worktree.upgrade()?;
455                Some((worktree.read(cx).id(), worktree))
456            })
457            .collect::<HashMap<_, _>>();
458
459        let (client, project_id) = self.upstream_client().context("invalid project")?;
460
461        for worktree in worktrees {
462            if let Some(old_worktree) =
463                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
464            {
465                let push_strong_handle =
466                    self.retain_worktrees || old_worktree.read(cx).is_visible();
467                let handle = if push_strong_handle {
468                    WorktreeHandle::Strong(old_worktree.clone())
469                } else {
470                    WorktreeHandle::Weak(old_worktree.downgrade())
471                };
472                self.worktrees.push(handle);
473            } else {
474                self.add(
475                    &Worktree::remote(project_id, replica_id, worktree, client.clone(), cx),
476                    cx,
477                );
478            }
479        }
480        self.send_project_updates(cx);
481
482        Ok(())
483    }
484
485    pub fn move_worktree(
486        &mut self,
487        source: WorktreeId,
488        destination: WorktreeId,
489        cx: &mut Context<Self>,
490    ) -> Result<()> {
491        if source == destination {
492            return Ok(());
493        }
494
495        let mut source_index = None;
496        let mut destination_index = None;
497        for (i, worktree) in self.worktrees.iter().enumerate() {
498            if let Some(worktree) = worktree.upgrade() {
499                let worktree_id = worktree.read(cx).id();
500                if worktree_id == source {
501                    source_index = Some(i);
502                    if destination_index.is_some() {
503                        break;
504                    }
505                } else if worktree_id == destination {
506                    destination_index = Some(i);
507                    if source_index.is_some() {
508                        break;
509                    }
510                }
511            }
512        }
513
514        let source_index =
515            source_index.with_context(|| format!("Missing worktree for id {source}"))?;
516        let destination_index =
517            destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
518
519        if source_index == destination_index {
520            return Ok(());
521        }
522
523        let worktree_to_move = self.worktrees.remove(source_index);
524        self.worktrees.insert(destination_index, worktree_to_move);
525        self.worktrees_reordered = true;
526        cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
527        cx.notify();
528        Ok(())
529    }
530
531    pub fn disconnected_from_host(&mut self, cx: &mut App) {
532        for worktree in &self.worktrees {
533            if let Some(worktree) = worktree.upgrade() {
534                worktree.update(cx, |worktree, _| {
535                    if let Some(worktree) = worktree.as_remote_mut() {
536                        worktree.disconnected_from_host();
537                    }
538                });
539            }
540        }
541    }
542
543    pub fn send_project_updates(&mut self, cx: &mut Context<Self>) {
544        let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
545            return;
546        };
547
548        let update = proto::UpdateProject {
549            project_id,
550            worktrees: self.worktree_metadata_protos(cx),
551        };
552
553        // collab has bad concurrency guarantees, so we send requests in serial.
554        let update_project = if downstream_client.is_via_collab() {
555            Some(downstream_client.request(update))
556        } else {
557            downstream_client.send(update).log_err();
558            None
559        };
560        cx.spawn(async move |this, cx| {
561            if let Some(update_project) = update_project {
562                update_project.await?;
563            }
564
565            this.update(cx, |this, cx| {
566                let worktrees = this.worktrees().collect::<Vec<_>>();
567
568                for worktree in worktrees {
569                    worktree.update(cx, |worktree, cx| {
570                        let client = downstream_client.clone();
571                        worktree.observe_updates(project_id, cx, {
572                            move |update| {
573                                let client = client.clone();
574                                async move {
575                                    if client.is_via_collab() {
576                                        client
577                                            .request(update)
578                                            .map(|result| result.log_err().is_some())
579                                            .await
580                                    } else {
581                                        client.send(update).log_err().is_some()
582                                    }
583                                }
584                            }
585                        });
586                    });
587
588                    cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
589                }
590
591                anyhow::Ok(())
592            })
593        })
594        .detach_and_log_err(cx);
595    }
596
597    pub fn worktree_metadata_protos(&self, cx: &App) -> Vec<proto::WorktreeMetadata> {
598        self.worktrees()
599            .map(|worktree| {
600                let worktree = worktree.read(cx);
601                proto::WorktreeMetadata {
602                    id: worktree.id().to_proto(),
603                    root_name: worktree.root_name().into(),
604                    visible: worktree.is_visible(),
605                    abs_path: worktree.abs_path().to_proto(),
606                }
607            })
608            .collect()
609    }
610
611    pub fn shared(
612        &mut self,
613        remote_id: u64,
614        downstream_client: AnyProtoClient,
615        cx: &mut Context<Self>,
616    ) {
617        self.retain_worktrees = true;
618        self.downstream_client = Some((downstream_client, remote_id));
619
620        // When shared, retain all worktrees
621        for worktree_handle in self.worktrees.iter_mut() {
622            match worktree_handle {
623                WorktreeHandle::Strong(_) => {}
624                WorktreeHandle::Weak(worktree) => {
625                    if let Some(worktree) = worktree.upgrade() {
626                        *worktree_handle = WorktreeHandle::Strong(worktree);
627                    }
628                }
629            }
630        }
631        self.send_project_updates(cx);
632    }
633
634    pub fn unshared(&mut self, cx: &mut Context<Self>) {
635        self.retain_worktrees = false;
636        self.downstream_client.take();
637
638        // When not shared, only retain the visible worktrees
639        for worktree_handle in self.worktrees.iter_mut() {
640            if let WorktreeHandle::Strong(worktree) = worktree_handle {
641                let is_visible = worktree.update(cx, |worktree, _| {
642                    worktree.stop_observing_updates();
643                    worktree.is_visible()
644                });
645                if !is_visible {
646                    *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
647                }
648            }
649        }
650    }
651
652    /// search over all worktrees and return buffers that *might* match the search.
653    pub fn find_search_candidates(
654        &self,
655        query: SearchQuery,
656        limit: usize,
657        open_entries: HashSet<ProjectEntryId>,
658        fs: Arc<dyn Fs>,
659        cx: &Context<Self>,
660    ) -> Receiver<ProjectPath> {
661        let snapshots = self
662            .visible_worktrees(cx)
663            .filter_map(|tree| {
664                let tree = tree.read(cx);
665                Some((tree.snapshot(), tree.as_local()?.settings()))
666            })
667            .collect::<Vec<_>>();
668
669        let executor = cx.background_executor().clone();
670
671        // We want to return entries in the order they are in the worktrees, so we have one
672        // thread that iterates over the worktrees (and ignored directories) as necessary,
673        // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
674        // channel.
675        // We spawn a number of workers that take items from the filter channel and check the query
676        // against the version of the file on disk.
677        let (filter_tx, filter_rx) = smol::channel::bounded(64);
678        let (output_tx, output_rx) = smol::channel::bounded(64);
679        let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
680
681        let input = cx.background_spawn({
682            let fs = fs.clone();
683            let query = query.clone();
684            async move {
685                Self::find_candidate_paths(
686                    fs,
687                    snapshots,
688                    open_entries,
689                    query,
690                    filter_tx,
691                    output_tx,
692                )
693                .await
694                .log_err();
695            }
696        });
697        const MAX_CONCURRENT_FILE_SCANS: usize = 64;
698        let filters = cx.background_spawn(async move {
699            let fs = &fs;
700            let query = &query;
701            executor
702                .scoped(move |scope| {
703                    for _ in 0..MAX_CONCURRENT_FILE_SCANS {
704                        let filter_rx = filter_rx.clone();
705                        scope.spawn(async move {
706                            Self::filter_paths(fs, filter_rx, query)
707                                .await
708                                .log_with_level(log::Level::Debug);
709                        })
710                    }
711                })
712                .await;
713        });
714        cx.background_spawn(async move {
715            let mut matched = 0;
716            while let Ok(mut receiver) = output_rx.recv().await {
717                let Some(path) = receiver.next().await else {
718                    continue;
719                };
720                let Ok(_) = matching_paths_tx.send(path).await else {
721                    break;
722                };
723                matched += 1;
724                if matched == limit {
725                    break;
726                }
727            }
728            drop(input);
729            drop(filters);
730        })
731        .detach();
732        matching_paths_rx
733    }
734
735    fn scan_ignored_dir<'a>(
736        fs: &'a Arc<dyn Fs>,
737        snapshot: &'a worktree::Snapshot,
738        path: &'a Path,
739        query: &'a SearchQuery,
740        filter_tx: &'a Sender<MatchingEntry>,
741        output_tx: &'a Sender<oneshot::Receiver<ProjectPath>>,
742    ) -> BoxFuture<'a, Result<()>> {
743        async move {
744            let abs_path = snapshot.abs_path().join(path);
745            let Some(mut files) = fs
746                .read_dir(&abs_path)
747                .await
748                .with_context(|| format!("listing ignored path {abs_path:?}"))
749                .log_err()
750            else {
751                return Ok(());
752            };
753
754            let mut results = Vec::new();
755
756            while let Some(Ok(file)) = files.next().await {
757                let Some(metadata) = fs
758                    .metadata(&file)
759                    .await
760                    .with_context(|| format!("fetching fs metadata for {abs_path:?}"))
761                    .log_err()
762                    .flatten()
763                else {
764                    continue;
765                };
766                if metadata.is_symlink || metadata.is_fifo {
767                    continue;
768                }
769                results.push((
770                    file.strip_prefix(snapshot.abs_path())?.to_path_buf(),
771                    !metadata.is_dir,
772                ))
773            }
774            results.sort_by(|(a_path, _), (b_path, _)| a_path.cmp(b_path));
775            for (path, is_file) in results {
776                if is_file {
777                    if query.filters_path() {
778                        let matched_path = if query.match_full_paths() {
779                            let mut full_path = PathBuf::from(snapshot.root_name());
780                            full_path.push(&path);
781                            query.match_path(&full_path)
782                        } else {
783                            query.match_path(&path)
784                        };
785                        if !matched_path {
786                            continue;
787                        }
788                    }
789                    let (tx, rx) = oneshot::channel();
790                    output_tx.send(rx).await?;
791                    filter_tx
792                        .send(MatchingEntry {
793                            respond: tx,
794                            worktree_path: snapshot.abs_path().clone(),
795                            path: ProjectPath {
796                                worktree_id: snapshot.id(),
797                                path: Arc::from(path),
798                            },
799                        })
800                        .await?;
801                } else {
802                    Self::scan_ignored_dir(fs, snapshot, &path, query, filter_tx, output_tx)
803                        .await?;
804                }
805            }
806            Ok(())
807        }
808        .boxed()
809    }
810
811    async fn find_candidate_paths(
812        fs: Arc<dyn Fs>,
813        snapshots: Vec<(worktree::Snapshot, WorktreeSettings)>,
814        open_entries: HashSet<ProjectEntryId>,
815        query: SearchQuery,
816        filter_tx: Sender<MatchingEntry>,
817        output_tx: Sender<oneshot::Receiver<ProjectPath>>,
818    ) -> Result<()> {
819        for (snapshot, settings) in snapshots {
820            for entry in snapshot.entries(query.include_ignored(), 0) {
821                if entry.is_dir() && entry.is_ignored {
822                    if !settings.is_path_excluded(&entry.path) {
823                        Self::scan_ignored_dir(
824                            &fs,
825                            &snapshot,
826                            &entry.path,
827                            &query,
828                            &filter_tx,
829                            &output_tx,
830                        )
831                        .await?;
832                    }
833                    continue;
834                }
835
836                if entry.is_fifo || !entry.is_file() {
837                    continue;
838                }
839
840                if query.filters_path() {
841                    let matched_path = if query.match_full_paths() {
842                        let mut full_path = PathBuf::from(snapshot.root_name());
843                        full_path.push(&entry.path);
844                        query.match_path(&full_path)
845                    } else {
846                        query.match_path(&entry.path)
847                    };
848                    if !matched_path {
849                        continue;
850                    }
851                }
852
853                let (mut tx, rx) = oneshot::channel();
854
855                if open_entries.contains(&entry.id) {
856                    tx.send(ProjectPath {
857                        worktree_id: snapshot.id(),
858                        path: entry.path.clone(),
859                    })
860                    .await?;
861                } else {
862                    filter_tx
863                        .send(MatchingEntry {
864                            respond: tx,
865                            worktree_path: snapshot.abs_path().clone(),
866                            path: ProjectPath {
867                                worktree_id: snapshot.id(),
868                                path: entry.path.clone(),
869                            },
870                        })
871                        .await?;
872                }
873
874                output_tx.send(rx).await?;
875            }
876        }
877        Ok(())
878    }
879
880    async fn filter_paths(
881        fs: &Arc<dyn Fs>,
882        input: Receiver<MatchingEntry>,
883        query: &SearchQuery,
884    ) -> Result<()> {
885        let mut input = pin!(input);
886        while let Some(mut entry) = input.next().await {
887            let abs_path = entry.worktree_path.join(&entry.path.path);
888            let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
889                continue;
890            };
891
892            let mut file = BufReader::new(file);
893            let file_start = file.fill_buf()?;
894
895            if let Err(Some(starting_position)) =
896                std::str::from_utf8(file_start).map_err(|e| e.error_len())
897            {
898                // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
899                // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
900                log::debug!(
901                    "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
902                );
903                continue;
904            }
905
906            if query.detect(file).unwrap_or(false) {
907                entry.respond.send(entry.path).await?
908            }
909        }
910
911        Ok(())
912    }
913
914    pub async fn handle_create_project_entry(
915        this: Entity<Self>,
916        envelope: TypedEnvelope<proto::CreateProjectEntry>,
917        mut cx: AsyncApp,
918    ) -> Result<proto::ProjectEntryResponse> {
919        let worktree = this.update(&mut cx, |this, cx| {
920            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
921            this.worktree_for_id(worktree_id, cx)
922                .context("worktree not found")
923        })??;
924        Worktree::handle_create_entry(worktree, envelope.payload, cx).await
925    }
926
927    pub async fn handle_copy_project_entry(
928        this: Entity<Self>,
929        envelope: TypedEnvelope<proto::CopyProjectEntry>,
930        mut cx: AsyncApp,
931    ) -> Result<proto::ProjectEntryResponse> {
932        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
933        let worktree = this.update(&mut cx, |this, cx| {
934            this.worktree_for_entry(entry_id, cx)
935                .context("worktree not found")
936        })??;
937        Worktree::handle_copy_entry(worktree, envelope.payload, cx).await
938    }
939
940    pub async fn handle_delete_project_entry(
941        this: Entity<Self>,
942        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
943        mut cx: AsyncApp,
944    ) -> Result<proto::ProjectEntryResponse> {
945        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
946        let worktree = this.update(&mut cx, |this, cx| {
947            this.worktree_for_entry(entry_id, cx)
948                .context("worktree not found")
949        })??;
950        Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
951    }
952
953    pub async fn handle_expand_project_entry(
954        this: Entity<Self>,
955        envelope: TypedEnvelope<proto::ExpandProjectEntry>,
956        mut cx: AsyncApp,
957    ) -> Result<proto::ExpandProjectEntryResponse> {
958        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
959        let worktree = this
960            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
961            .context("invalid request")?;
962        Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
963    }
964
965    pub async fn handle_expand_all_for_project_entry(
966        this: Entity<Self>,
967        envelope: TypedEnvelope<proto::ExpandAllForProjectEntry>,
968        mut cx: AsyncApp,
969    ) -> Result<proto::ExpandAllForProjectEntryResponse> {
970        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
971        let worktree = this
972            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
973            .context("invalid request")?;
974        Worktree::handle_expand_all_for_entry(worktree, envelope.payload, cx).await
975    }
976
977    pub fn fs(&self) -> Option<Arc<dyn Fs>> {
978        match &self.state {
979            WorktreeStoreState::Local { fs } => Some(fs.clone()),
980            WorktreeStoreState::Remote { .. } => None,
981        }
982    }
983}
984
985#[derive(Clone, Debug)]
986enum WorktreeHandle {
987    Strong(Entity<Worktree>),
988    Weak(WeakEntity<Worktree>),
989}
990
991impl WorktreeHandle {
992    fn upgrade(&self) -> Option<Entity<Worktree>> {
993        match self {
994            WorktreeHandle::Strong(handle) => Some(handle.clone()),
995            WorktreeHandle::Weak(handle) => handle.upgrade(),
996        }
997    }
998}