worktree_store.rs

  1use std::{
  2    path::{Path, PathBuf},
  3    sync::Arc,
  4};
  5
  6use anyhow::{anyhow, Context as _, Result};
  7use collections::{HashMap, HashSet};
  8use fs::Fs;
  9use futures::{future::BoxFuture, SinkExt};
 10use gpui::{AppContext, AsyncAppContext, EntityId, EventEmitter, Model, ModelContext, WeakModel};
 11use postage::oneshot;
 12use rpc::{
 13    proto::{self, AnyProtoClient},
 14    TypedEnvelope,
 15};
 16use smol::{
 17    channel::{Receiver, Sender},
 18    future::FutureExt,
 19    stream::StreamExt,
 20};
 21use text::ReplicaId;
 22use util::{paths::compare_paths, ResultExt};
 23use worktree::{Entry, ProjectEntryId, Worktree, WorktreeId, WorktreeSettings};
 24
 25use crate::{search::SearchQuery, ProjectPath};
 26
 27struct MatchingEntry {
 28    worktree_path: Arc<Path>,
 29    path: ProjectPath,
 30    respond: oneshot::Sender<ProjectPath>,
 31}
 32
 33pub struct WorktreeStore {
 34    is_shared: bool,
 35    worktrees: Vec<WorktreeHandle>,
 36    worktrees_reordered: bool,
 37}
 38
 39pub enum WorktreeStoreEvent {
 40    WorktreeAdded(Model<Worktree>),
 41    WorktreeRemoved(EntityId, WorktreeId),
 42    WorktreeOrderChanged,
 43}
 44
 45impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
 46
 47impl WorktreeStore {
 48    pub fn new(retain_worktrees: bool) -> Self {
 49        Self {
 50            is_shared: retain_worktrees,
 51            worktrees: Vec::new(),
 52            worktrees_reordered: false,
 53        }
 54    }
 55
 56    /// Iterates through all worktrees, including ones that don't appear in the project panel
 57    pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Model<Worktree>> {
 58        self.worktrees
 59            .iter()
 60            .filter_map(move |worktree| worktree.upgrade())
 61    }
 62
 63    /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
 64    pub fn visible_worktrees<'a>(
 65        &'a self,
 66        cx: &'a AppContext,
 67    ) -> impl 'a + DoubleEndedIterator<Item = Model<Worktree>> {
 68        self.worktrees()
 69            .filter(|worktree| worktree.read(cx).is_visible())
 70    }
 71
 72    pub fn worktree_for_id(&self, id: WorktreeId, cx: &AppContext) -> Option<Model<Worktree>> {
 73        self.worktrees()
 74            .find(|worktree| worktree.read(cx).id() == id)
 75    }
 76
 77    pub fn worktree_for_entry(
 78        &self,
 79        entry_id: ProjectEntryId,
 80        cx: &AppContext,
 81    ) -> Option<Model<Worktree>> {
 82        self.worktrees()
 83            .find(|worktree| worktree.read(cx).contains_entry(entry_id))
 84    }
 85
 86    pub fn entry_for_id<'a>(
 87        &'a self,
 88        entry_id: ProjectEntryId,
 89        cx: &'a AppContext,
 90    ) -> Option<&'a Entry> {
 91        self.worktrees()
 92            .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
 93    }
 94
 95    pub fn add(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
 96        let push_strong_handle = self.is_shared || worktree.read(cx).is_visible();
 97        let handle = if push_strong_handle {
 98            WorktreeHandle::Strong(worktree.clone())
 99        } else {
100            WorktreeHandle::Weak(worktree.downgrade())
101        };
102        if self.worktrees_reordered {
103            self.worktrees.push(handle);
104        } else {
105            let i = match self
106                .worktrees
107                .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
108                    other.upgrade().map(|worktree| worktree.read(cx).abs_path())
109                }) {
110                Ok(i) | Err(i) => i,
111            };
112            self.worktrees.insert(i, handle);
113        }
114
115        cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
116
117        let handle_id = worktree.entity_id();
118        cx.observe_release(worktree, move |_, worktree, cx| {
119            cx.emit(WorktreeStoreEvent::WorktreeRemoved(
120                handle_id,
121                worktree.id(),
122            ));
123        })
124        .detach();
125    }
126
127    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut ModelContext<Self>) {
128        self.worktrees.retain(|worktree| {
129            if let Some(worktree) = worktree.upgrade() {
130                if worktree.read(cx).id() == id_to_remove {
131                    cx.emit(WorktreeStoreEvent::WorktreeRemoved(
132                        worktree.entity_id(),
133                        id_to_remove,
134                    ));
135                    false
136                } else {
137                    true
138                }
139            } else {
140                false
141            }
142        });
143    }
144
145    pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
146        self.worktrees_reordered = worktrees_reordered;
147    }
148
149    pub fn set_worktrees_from_proto(
150        &mut self,
151        worktrees: Vec<proto::WorktreeMetadata>,
152        replica_id: ReplicaId,
153        remote_id: u64,
154        client: AnyProtoClient,
155        cx: &mut ModelContext<Self>,
156    ) -> Result<()> {
157        let mut old_worktrees_by_id = self
158            .worktrees
159            .drain(..)
160            .filter_map(|worktree| {
161                let worktree = worktree.upgrade()?;
162                Some((worktree.read(cx).id(), worktree))
163            })
164            .collect::<HashMap<_, _>>();
165
166        for worktree in worktrees {
167            if let Some(old_worktree) =
168                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
169            {
170                self.worktrees.push(WorktreeHandle::Strong(old_worktree));
171            } else {
172                self.add(
173                    &Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx),
174                    cx,
175                );
176            }
177        }
178
179        Ok(())
180    }
181
182    pub fn move_worktree(
183        &mut self,
184        source: WorktreeId,
185        destination: WorktreeId,
186        cx: &mut ModelContext<Self>,
187    ) -> Result<()> {
188        if source == destination {
189            return Ok(());
190        }
191
192        let mut source_index = None;
193        let mut destination_index = None;
194        for (i, worktree) in self.worktrees.iter().enumerate() {
195            if let Some(worktree) = worktree.upgrade() {
196                let worktree_id = worktree.read(cx).id();
197                if worktree_id == source {
198                    source_index = Some(i);
199                    if destination_index.is_some() {
200                        break;
201                    }
202                } else if worktree_id == destination {
203                    destination_index = Some(i);
204                    if source_index.is_some() {
205                        break;
206                    }
207                }
208            }
209        }
210
211        let source_index =
212            source_index.with_context(|| format!("Missing worktree for id {source}"))?;
213        let destination_index =
214            destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
215
216        if source_index == destination_index {
217            return Ok(());
218        }
219
220        let worktree_to_move = self.worktrees.remove(source_index);
221        self.worktrees.insert(destination_index, worktree_to_move);
222        self.worktrees_reordered = true;
223        cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
224        cx.notify();
225        Ok(())
226    }
227
228    pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
229        for worktree in &self.worktrees {
230            if let Some(worktree) = worktree.upgrade() {
231                worktree.update(cx, |worktree, _| {
232                    if let Some(worktree) = worktree.as_remote_mut() {
233                        worktree.disconnected_from_host();
234                    }
235                });
236            }
237        }
238    }
239
240    pub fn set_shared(&mut self, is_shared: bool, cx: &mut ModelContext<Self>) {
241        self.is_shared = is_shared;
242
243        // When shared, retain all worktrees
244        if is_shared {
245            for worktree_handle in self.worktrees.iter_mut() {
246                match worktree_handle {
247                    WorktreeHandle::Strong(_) => {}
248                    WorktreeHandle::Weak(worktree) => {
249                        if let Some(worktree) = worktree.upgrade() {
250                            *worktree_handle = WorktreeHandle::Strong(worktree);
251                        }
252                    }
253                }
254            }
255        }
256        // When not shared, only retain the visible worktrees
257        else {
258            for worktree_handle in self.worktrees.iter_mut() {
259                if let WorktreeHandle::Strong(worktree) = worktree_handle {
260                    let is_visible = worktree.update(cx, |worktree, _| {
261                        worktree.stop_observing_updates();
262                        worktree.is_visible()
263                    });
264                    if !is_visible {
265                        *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
266                    }
267                }
268            }
269        }
270    }
271
272    /// search over all worktrees and return buffers that *might* match the search.
273    pub fn find_search_candidates(
274        &self,
275        query: SearchQuery,
276        limit: usize,
277        open_entries: HashSet<ProjectEntryId>,
278        fs: Arc<dyn Fs>,
279        cx: &ModelContext<Self>,
280    ) -> Receiver<ProjectPath> {
281        let snapshots = self
282            .visible_worktrees(cx)
283            .filter_map(|tree| {
284                let tree = tree.read(cx);
285                Some((tree.snapshot(), tree.as_local()?.settings()))
286            })
287            .collect::<Vec<_>>();
288
289        let executor = cx.background_executor().clone();
290
291        // We want to return entries in the order they are in the worktrees, so we have one
292        // thread that iterates over the worktrees (and ignored directories) as necessary,
293        // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
294        // channel.
295        // We spawn a number of workers that take items from the filter channel and check the query
296        // against the version of the file on disk.
297        let (filter_tx, filter_rx) = smol::channel::bounded(64);
298        let (output_tx, mut output_rx) = smol::channel::bounded(64);
299        let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
300
301        let input = cx.background_executor().spawn({
302            let fs = fs.clone();
303            let query = query.clone();
304            async move {
305                Self::find_candidate_paths(
306                    fs,
307                    snapshots,
308                    open_entries,
309                    query,
310                    filter_tx,
311                    output_tx,
312                )
313                .await
314                .log_err();
315            }
316        });
317        const MAX_CONCURRENT_FILE_SCANS: usize = 64;
318        let filters = cx.background_executor().spawn(async move {
319            let fs = &fs;
320            let query = &query;
321            executor
322                .scoped(move |scope| {
323                    for _ in 0..MAX_CONCURRENT_FILE_SCANS {
324                        let filter_rx = filter_rx.clone();
325                        scope.spawn(async move {
326                            Self::filter_paths(fs, filter_rx, query).await.log_err();
327                        })
328                    }
329                })
330                .await;
331        });
332        cx.background_executor()
333            .spawn(async move {
334                let mut matched = 0;
335                while let Some(mut receiver) = output_rx.next().await {
336                    let Some(path) = receiver.next().await else {
337                        continue;
338                    };
339                    let Ok(_) = matching_paths_tx.send(path).await else {
340                        break;
341                    };
342                    matched += 1;
343                    if matched == limit {
344                        break;
345                    }
346                }
347                drop(input);
348                drop(filters);
349            })
350            .detach();
351        return matching_paths_rx;
352    }
353
354    fn scan_ignored_dir<'a>(
355        fs: &'a Arc<dyn Fs>,
356        snapshot: &'a worktree::Snapshot,
357        path: &'a Path,
358        query: &'a SearchQuery,
359        include_root: bool,
360        filter_tx: &'a Sender<MatchingEntry>,
361        output_tx: &'a Sender<oneshot::Receiver<ProjectPath>>,
362    ) -> BoxFuture<'a, Result<()>> {
363        async move {
364            let abs_path = snapshot.abs_path().join(&path);
365            let Some(mut files) = fs
366                .read_dir(&abs_path)
367                .await
368                .with_context(|| format!("listing ignored path {abs_path:?}"))
369                .log_err()
370            else {
371                return Ok(());
372            };
373
374            let mut results = Vec::new();
375
376            while let Some(Ok(file)) = files.next().await {
377                let Some(metadata) = fs
378                    .metadata(&file)
379                    .await
380                    .with_context(|| format!("fetching fs metadata for {abs_path:?}"))
381                    .log_err()
382                    .flatten()
383                else {
384                    continue;
385                };
386                if metadata.is_symlink || metadata.is_fifo {
387                    continue;
388                }
389                results.push((
390                    file.strip_prefix(snapshot.abs_path())?.to_path_buf(),
391                    !metadata.is_dir,
392                ))
393            }
394            results.sort_by(|(a_path, a_is_file), (b_path, b_is_file)| {
395                compare_paths((a_path, *a_is_file), (b_path, *b_is_file))
396            });
397            for (path, is_file) in results {
398                if is_file {
399                    if query.filters_path() {
400                        let matched_path = if include_root {
401                            let mut full_path = PathBuf::from(snapshot.root_name());
402                            full_path.push(&path);
403                            query.file_matches(&full_path)
404                        } else {
405                            query.file_matches(&path)
406                        };
407                        if !matched_path {
408                            continue;
409                        }
410                    }
411                    let (tx, rx) = oneshot::channel();
412                    output_tx.send(rx).await?;
413                    filter_tx
414                        .send(MatchingEntry {
415                            respond: tx,
416                            worktree_path: snapshot.abs_path().clone(),
417                            path: ProjectPath {
418                                worktree_id: snapshot.id(),
419                                path: Arc::from(path),
420                            },
421                        })
422                        .await?;
423                } else {
424                    Self::scan_ignored_dir(
425                        fs,
426                        snapshot,
427                        &path,
428                        query,
429                        include_root,
430                        filter_tx,
431                        output_tx,
432                    )
433                    .await?;
434                }
435            }
436            Ok(())
437        }
438        .boxed()
439    }
440
441    async fn find_candidate_paths(
442        fs: Arc<dyn Fs>,
443        snapshots: Vec<(worktree::Snapshot, WorktreeSettings)>,
444        open_entries: HashSet<ProjectEntryId>,
445        query: SearchQuery,
446        filter_tx: Sender<MatchingEntry>,
447        output_tx: Sender<oneshot::Receiver<ProjectPath>>,
448    ) -> Result<()> {
449        let include_root = snapshots.len() > 1;
450        for (snapshot, settings) in snapshots {
451            let mut entries: Vec<_> = snapshot.entries(query.include_ignored(), 0).collect();
452            entries.sort_by(|a, b| compare_paths((&a.path, a.is_file()), (&b.path, b.is_file())));
453            for entry in entries {
454                if entry.is_dir() && entry.is_ignored {
455                    if !settings.is_path_excluded(&entry.path) {
456                        Self::scan_ignored_dir(
457                            &fs,
458                            &snapshot,
459                            &entry.path,
460                            &query,
461                            include_root,
462                            &filter_tx,
463                            &output_tx,
464                        )
465                        .await?;
466                    }
467                    continue;
468                }
469
470                if entry.is_fifo || !entry.is_file() {
471                    continue;
472                }
473
474                if query.filters_path() {
475                    let matched_path = if include_root {
476                        let mut full_path = PathBuf::from(snapshot.root_name());
477                        full_path.push(&entry.path);
478                        query.file_matches(&full_path)
479                    } else {
480                        query.file_matches(&entry.path)
481                    };
482                    if !matched_path {
483                        continue;
484                    }
485                }
486
487                let (mut tx, rx) = oneshot::channel();
488
489                if open_entries.contains(&entry.id) {
490                    tx.send(ProjectPath {
491                        worktree_id: snapshot.id(),
492                        path: entry.path.clone(),
493                    })
494                    .await?;
495                } else {
496                    filter_tx
497                        .send(MatchingEntry {
498                            respond: tx,
499                            worktree_path: snapshot.abs_path().clone(),
500                            path: ProjectPath {
501                                worktree_id: snapshot.id(),
502                                path: entry.path.clone(),
503                            },
504                        })
505                        .await?;
506                }
507
508                output_tx.send(rx).await?;
509            }
510        }
511        Ok(())
512    }
513
514    async fn filter_paths(
515        fs: &Arc<dyn Fs>,
516        mut input: Receiver<MatchingEntry>,
517        query: &SearchQuery,
518    ) -> Result<()> {
519        while let Some(mut entry) = input.next().await {
520            let abs_path = entry.worktree_path.join(&entry.path.path);
521            let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
522                continue;
523            };
524            if query.detect(file).unwrap_or(false) {
525                entry.respond.send(entry.path).await?
526            }
527        }
528
529        Ok(())
530    }
531
532    pub async fn handle_create_project_entry(
533        this: Model<Self>,
534        envelope: TypedEnvelope<proto::CreateProjectEntry>,
535        mut cx: AsyncAppContext,
536    ) -> Result<proto::ProjectEntryResponse> {
537        let worktree = this.update(&mut cx, |this, cx| {
538            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
539            this.worktree_for_id(worktree_id, cx)
540                .ok_or_else(|| anyhow!("worktree not found"))
541        })??;
542        Worktree::handle_create_entry(worktree, envelope.payload, cx).await
543    }
544
545    pub async fn handle_rename_project_entry(
546        this: Model<Self>,
547        envelope: TypedEnvelope<proto::RenameProjectEntry>,
548        mut cx: AsyncAppContext,
549    ) -> Result<proto::ProjectEntryResponse> {
550        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
551        let worktree = this.update(&mut cx, |this, cx| {
552            this.worktree_for_entry(entry_id, cx)
553                .ok_or_else(|| anyhow!("worktree not found"))
554        })??;
555        Worktree::handle_rename_entry(worktree, envelope.payload, cx).await
556    }
557
558    pub async fn handle_copy_project_entry(
559        this: Model<Self>,
560        envelope: TypedEnvelope<proto::CopyProjectEntry>,
561        mut cx: AsyncAppContext,
562    ) -> Result<proto::ProjectEntryResponse> {
563        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
564        let worktree = this.update(&mut cx, |this, cx| {
565            this.worktree_for_entry(entry_id, cx)
566                .ok_or_else(|| anyhow!("worktree not found"))
567        })??;
568        Worktree::handle_copy_entry(worktree, envelope.payload, cx).await
569    }
570
571    pub async fn handle_delete_project_entry(
572        this: Model<Self>,
573        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
574        mut cx: AsyncAppContext,
575    ) -> Result<proto::ProjectEntryResponse> {
576        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
577        let worktree = this.update(&mut cx, |this, cx| {
578            this.worktree_for_entry(entry_id, cx)
579                .ok_or_else(|| anyhow!("worktree not found"))
580        })??;
581        Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
582    }
583
584    pub async fn handle_expand_project_entry(
585        this: Model<Self>,
586        envelope: TypedEnvelope<proto::ExpandProjectEntry>,
587        mut cx: AsyncAppContext,
588    ) -> Result<proto::ExpandProjectEntryResponse> {
589        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
590        let worktree = this
591            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
592            .ok_or_else(|| anyhow!("invalid request"))?;
593        Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
594    }
595}
596
597#[derive(Clone)]
598enum WorktreeHandle {
599    Strong(Model<Worktree>),
600    Weak(WeakModel<Worktree>),
601}
602
603impl WorktreeHandle {
604    fn upgrade(&self) -> Option<Model<Worktree>> {
605        match self {
606            WorktreeHandle::Strong(handle) => Some(handle.clone()),
607            WorktreeHandle::Weak(handle) => handle.upgrade(),
608        }
609    }
610}