worktree_store.rs

  1use std::{
  2    collections::VecDeque,
  3    path::{Path, PathBuf},
  4    sync::Arc,
  5};
  6
  7use anyhow::{anyhow, Context as _, Result};
  8use collections::{HashMap, HashSet};
  9use fs::Fs;
 10use futures::SinkExt;
 11use gpui::{AppContext, AsyncAppContext, EntityId, EventEmitter, Model, ModelContext, WeakModel};
 12use postage::oneshot;
 13use rpc::{
 14    proto::{self, AnyProtoClient},
 15    TypedEnvelope,
 16};
 17use smol::{
 18    channel::{Receiver, Sender},
 19    stream::StreamExt,
 20};
 21use text::ReplicaId;
 22use util::ResultExt;
 23use worktree::{Entry, ProjectEntryId, Worktree, WorktreeId, WorktreeSettings};
 24
 25use crate::{search::SearchQuery, ProjectPath};
 26
 27struct MatchingEntry {
 28    worktree_path: Arc<Path>,
 29    path: ProjectPath,
 30    respond: oneshot::Sender<ProjectPath>,
 31}
 32
 33pub struct WorktreeStore {
 34    is_shared: bool,
 35    worktrees: Vec<WorktreeHandle>,
 36    worktrees_reordered: bool,
 37}
 38
 39pub enum WorktreeStoreEvent {
 40    WorktreeAdded(Model<Worktree>),
 41    WorktreeRemoved(EntityId, WorktreeId),
 42    WorktreeOrderChanged,
 43}
 44
 45impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
 46
 47impl WorktreeStore {
 48    pub fn new(retain_worktrees: bool) -> Self {
 49        Self {
 50            is_shared: retain_worktrees,
 51            worktrees: Vec::new(),
 52            worktrees_reordered: false,
 53        }
 54    }
 55
 56    /// Iterates through all worktrees, including ones that don't appear in the project panel
 57    pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Model<Worktree>> {
 58        self.worktrees
 59            .iter()
 60            .filter_map(move |worktree| worktree.upgrade())
 61    }
 62
 63    /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
 64    pub fn visible_worktrees<'a>(
 65        &'a self,
 66        cx: &'a AppContext,
 67    ) -> impl 'a + DoubleEndedIterator<Item = Model<Worktree>> {
 68        self.worktrees()
 69            .filter(|worktree| worktree.read(cx).is_visible())
 70    }
 71
 72    pub fn worktree_for_id(&self, id: WorktreeId, cx: &AppContext) -> Option<Model<Worktree>> {
 73        self.worktrees()
 74            .find(|worktree| worktree.read(cx).id() == id)
 75    }
 76
 77    pub fn worktree_for_entry(
 78        &self,
 79        entry_id: ProjectEntryId,
 80        cx: &AppContext,
 81    ) -> Option<Model<Worktree>> {
 82        self.worktrees()
 83            .find(|worktree| worktree.read(cx).contains_entry(entry_id))
 84    }
 85
 86    pub fn entry_for_id<'a>(
 87        &'a self,
 88        entry_id: ProjectEntryId,
 89        cx: &'a AppContext,
 90    ) -> Option<&'a Entry> {
 91        self.worktrees()
 92            .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
 93    }
 94
 95    pub fn add(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
 96        let push_strong_handle = self.is_shared || worktree.read(cx).is_visible();
 97        let handle = if push_strong_handle {
 98            WorktreeHandle::Strong(worktree.clone())
 99        } else {
100            WorktreeHandle::Weak(worktree.downgrade())
101        };
102        if self.worktrees_reordered {
103            self.worktrees.push(handle);
104        } else {
105            let i = match self
106                .worktrees
107                .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
108                    other.upgrade().map(|worktree| worktree.read(cx).abs_path())
109                }) {
110                Ok(i) | Err(i) => i,
111            };
112            self.worktrees.insert(i, handle);
113        }
114
115        cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
116
117        let handle_id = worktree.entity_id();
118        cx.observe_release(worktree, move |_, worktree, cx| {
119            cx.emit(WorktreeStoreEvent::WorktreeRemoved(
120                handle_id,
121                worktree.id(),
122            ));
123        })
124        .detach();
125    }
126
127    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut ModelContext<Self>) {
128        self.worktrees.retain(|worktree| {
129            if let Some(worktree) = worktree.upgrade() {
130                if worktree.read(cx).id() == id_to_remove {
131                    cx.emit(WorktreeStoreEvent::WorktreeRemoved(
132                        worktree.entity_id(),
133                        id_to_remove,
134                    ));
135                    false
136                } else {
137                    true
138                }
139            } else {
140                false
141            }
142        });
143    }
144
145    pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
146        self.worktrees_reordered = worktrees_reordered;
147    }
148
149    pub fn set_worktrees_from_proto(
150        &mut self,
151        worktrees: Vec<proto::WorktreeMetadata>,
152        replica_id: ReplicaId,
153        remote_id: u64,
154        client: AnyProtoClient,
155        cx: &mut ModelContext<Self>,
156    ) -> Result<()> {
157        let mut old_worktrees_by_id = self
158            .worktrees
159            .drain(..)
160            .filter_map(|worktree| {
161                let worktree = worktree.upgrade()?;
162                Some((worktree.read(cx).id(), worktree))
163            })
164            .collect::<HashMap<_, _>>();
165
166        for worktree in worktrees {
167            if let Some(old_worktree) =
168                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
169            {
170                self.worktrees.push(WorktreeHandle::Strong(old_worktree));
171            } else {
172                self.add(
173                    &Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx),
174                    cx,
175                );
176            }
177        }
178
179        Ok(())
180    }
181
182    pub fn move_worktree(
183        &mut self,
184        source: WorktreeId,
185        destination: WorktreeId,
186        cx: &mut ModelContext<Self>,
187    ) -> Result<()> {
188        if source == destination {
189            return Ok(());
190        }
191
192        let mut source_index = None;
193        let mut destination_index = None;
194        for (i, worktree) in self.worktrees.iter().enumerate() {
195            if let Some(worktree) = worktree.upgrade() {
196                let worktree_id = worktree.read(cx).id();
197                if worktree_id == source {
198                    source_index = Some(i);
199                    if destination_index.is_some() {
200                        break;
201                    }
202                } else if worktree_id == destination {
203                    destination_index = Some(i);
204                    if source_index.is_some() {
205                        break;
206                    }
207                }
208            }
209        }
210
211        let source_index =
212            source_index.with_context(|| format!("Missing worktree for id {source}"))?;
213        let destination_index =
214            destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
215
216        if source_index == destination_index {
217            return Ok(());
218        }
219
220        let worktree_to_move = self.worktrees.remove(source_index);
221        self.worktrees.insert(destination_index, worktree_to_move);
222        self.worktrees_reordered = true;
223        cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
224        cx.notify();
225        Ok(())
226    }
227
228    pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
229        for worktree in &self.worktrees {
230            if let Some(worktree) = worktree.upgrade() {
231                worktree.update(cx, |worktree, _| {
232                    if let Some(worktree) = worktree.as_remote_mut() {
233                        worktree.disconnected_from_host();
234                    }
235                });
236            }
237        }
238    }
239
240    pub fn set_shared(&mut self, is_shared: bool, cx: &mut ModelContext<Self>) {
241        self.is_shared = is_shared;
242
243        // When shared, retain all worktrees
244        if is_shared {
245            for worktree_handle in self.worktrees.iter_mut() {
246                match worktree_handle {
247                    WorktreeHandle::Strong(_) => {}
248                    WorktreeHandle::Weak(worktree) => {
249                        if let Some(worktree) = worktree.upgrade() {
250                            *worktree_handle = WorktreeHandle::Strong(worktree);
251                        }
252                    }
253                }
254            }
255        }
256        // When not shared, only retain the visible worktrees
257        else {
258            for worktree_handle in self.worktrees.iter_mut() {
259                if let WorktreeHandle::Strong(worktree) = worktree_handle {
260                    let is_visible = worktree.update(cx, |worktree, _| {
261                        worktree.stop_observing_updates();
262                        worktree.is_visible()
263                    });
264                    if !is_visible {
265                        *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
266                    }
267                }
268            }
269        }
270    }
271
272    /// search over all worktrees and return buffers that *might* match the search.
273    pub fn find_search_candidates(
274        &self,
275        query: SearchQuery,
276        limit: usize,
277        open_entries: HashSet<ProjectEntryId>,
278        fs: Arc<dyn Fs>,
279        cx: &ModelContext<Self>,
280    ) -> Receiver<ProjectPath> {
281        let snapshots = self
282            .visible_worktrees(cx)
283            .filter_map(|tree| {
284                let tree = tree.read(cx);
285                Some((tree.snapshot(), tree.as_local()?.settings()))
286            })
287            .collect::<Vec<_>>();
288
289        let executor = cx.background_executor().clone();
290
291        // We want to return entries in the order they are in the worktrees, so we have one
292        // thread that iterates over the worktrees (and ignored directories) as necessary,
293        // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
294        // channel.
295        // We spawn a number of workers that take items from the filter channel and check the query
296        // against the version of the file on disk.
297        let (filter_tx, filter_rx) = smol::channel::bounded(64);
298        let (output_tx, mut output_rx) = smol::channel::bounded(64);
299        let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
300
301        let input = cx.background_executor().spawn({
302            let fs = fs.clone();
303            let query = query.clone();
304            async move {
305                Self::find_candidate_paths(
306                    fs,
307                    snapshots,
308                    open_entries,
309                    query,
310                    filter_tx,
311                    output_tx,
312                )
313                .await
314                .log_err();
315            }
316        });
317        const MAX_CONCURRENT_FILE_SCANS: usize = 64;
318        let filters = cx.background_executor().spawn(async move {
319            let fs = &fs;
320            let query = &query;
321            executor
322                .scoped(move |scope| {
323                    for _ in 0..MAX_CONCURRENT_FILE_SCANS {
324                        let filter_rx = filter_rx.clone();
325                        scope.spawn(async move {
326                            Self::filter_paths(fs, filter_rx, query).await.log_err();
327                        })
328                    }
329                })
330                .await;
331        });
332        cx.background_executor()
333            .spawn(async move {
334                let mut matched = 0;
335                while let Some(mut receiver) = output_rx.next().await {
336                    let Some(path) = receiver.next().await else {
337                        continue;
338                    };
339                    let Ok(_) = matching_paths_tx.send(path).await else {
340                        break;
341                    };
342                    matched += 1;
343                    if matched == limit {
344                        break;
345                    }
346                }
347                drop(input);
348                drop(filters);
349            })
350            .detach();
351        return matching_paths_rx;
352    }
353
354    async fn scan_ignored_dir(
355        fs: &Arc<dyn Fs>,
356        snapshot: &worktree::Snapshot,
357        path: &Path,
358        query: &SearchQuery,
359        filter_tx: &Sender<MatchingEntry>,
360        output_tx: &Sender<oneshot::Receiver<ProjectPath>>,
361    ) -> Result<()> {
362        let mut ignored_paths_to_process = VecDeque::from([snapshot.abs_path().join(&path)]);
363
364        while let Some(ignored_abs_path) = ignored_paths_to_process.pop_front() {
365            let metadata = fs
366                .metadata(&ignored_abs_path)
367                .await
368                .with_context(|| format!("fetching fs metadata for {ignored_abs_path:?}"))
369                .log_err()
370                .flatten();
371
372            let Some(fs_metadata) = metadata else {
373                continue;
374            };
375            if fs_metadata.is_dir {
376                let files = fs
377                    .read_dir(&ignored_abs_path)
378                    .await
379                    .with_context(|| format!("listing ignored path {ignored_abs_path:?}"))
380                    .log_err();
381
382                if let Some(mut subfiles) = files {
383                    while let Some(subfile) = subfiles.next().await {
384                        if let Some(subfile) = subfile.log_err() {
385                            ignored_paths_to_process.push_back(subfile);
386                        }
387                    }
388                }
389            } else if !fs_metadata.is_symlink {
390                if !query.file_matches(Some(&ignored_abs_path)) {
391                    continue;
392                }
393
394                let (tx, rx) = oneshot::channel();
395                output_tx.send(rx).await?;
396                filter_tx
397                    .send(MatchingEntry {
398                        respond: tx,
399                        worktree_path: snapshot.abs_path().clone(),
400                        path: ProjectPath {
401                            worktree_id: snapshot.id(),
402                            path: Arc::from(ignored_abs_path.strip_prefix(snapshot.abs_path())?),
403                        },
404                    })
405                    .await?;
406            }
407        }
408        Ok(())
409    }
410
411    async fn find_candidate_paths(
412        fs: Arc<dyn Fs>,
413        snapshots: Vec<(worktree::Snapshot, WorktreeSettings)>,
414        open_entries: HashSet<ProjectEntryId>,
415        query: SearchQuery,
416        filter_tx: Sender<MatchingEntry>,
417        output_tx: Sender<oneshot::Receiver<ProjectPath>>,
418    ) -> Result<()> {
419        let include_root = snapshots.len() > 1;
420        for (snapshot, settings) in snapshots {
421            for entry in snapshot.entries(query.include_ignored(), 0) {
422                if entry.is_dir() && entry.is_ignored {
423                    if !settings.is_path_excluded(&entry.path) {
424                        Self::scan_ignored_dir(
425                            &fs,
426                            &snapshot,
427                            &entry.path,
428                            &query,
429                            &filter_tx,
430                            &output_tx,
431                        )
432                        .await?;
433                    }
434                    continue;
435                }
436
437                if entry.is_fifo || !entry.is_file() {
438                    continue;
439                }
440
441                if open_entries.contains(&entry.id) {
442                    let (mut tx, rx) = oneshot::channel();
443                    tx.send(ProjectPath {
444                        worktree_id: snapshot.id(),
445                        path: entry.path.clone(),
446                    })
447                    .await?;
448                    output_tx.send(rx).await?;
449                    continue;
450                }
451
452                if query.filters_path() {
453                    let matched_path = if include_root {
454                        let mut full_path = PathBuf::from(snapshot.root_name());
455                        full_path.push(&entry.path);
456                        query.file_matches(Some(&full_path))
457                    } else {
458                        query.file_matches(Some(&entry.path))
459                    };
460                    if !matched_path {
461                        continue;
462                    }
463                }
464
465                let (tx, rx) = oneshot::channel();
466                output_tx.send(rx).await?;
467                filter_tx
468                    .send(MatchingEntry {
469                        respond: tx,
470                        worktree_path: snapshot.abs_path().clone(),
471                        path: ProjectPath {
472                            worktree_id: snapshot.id(),
473                            path: entry.path.clone(),
474                        },
475                    })
476                    .await?;
477            }
478        }
479        Ok(())
480    }
481
482    async fn filter_paths(
483        fs: &Arc<dyn Fs>,
484        mut input: Receiver<MatchingEntry>,
485        query: &SearchQuery,
486    ) -> Result<()> {
487        while let Some(mut entry) = input.next().await {
488            let abs_path = entry.worktree_path.join(&entry.path.path);
489            let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
490                continue;
491            };
492            if query.detect(file).unwrap_or(false) {
493                entry.respond.send(entry.path).await?
494            }
495        }
496
497        Ok(())
498    }
499
500    pub async fn handle_create_project_entry(
501        this: Model<Self>,
502        envelope: TypedEnvelope<proto::CreateProjectEntry>,
503        mut cx: AsyncAppContext,
504    ) -> Result<proto::ProjectEntryResponse> {
505        let worktree = this.update(&mut cx, |this, cx| {
506            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
507            this.worktree_for_id(worktree_id, cx)
508                .ok_or_else(|| anyhow!("worktree not found"))
509        })??;
510        Worktree::handle_create_entry(worktree, envelope.payload, cx).await
511    }
512
513    pub async fn handle_rename_project_entry(
514        this: Model<Self>,
515        envelope: TypedEnvelope<proto::RenameProjectEntry>,
516        mut cx: AsyncAppContext,
517    ) -> Result<proto::ProjectEntryResponse> {
518        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
519        let worktree = this.update(&mut cx, |this, cx| {
520            this.worktree_for_entry(entry_id, cx)
521                .ok_or_else(|| anyhow!("worktree not found"))
522        })??;
523        Worktree::handle_rename_entry(worktree, envelope.payload, cx).await
524    }
525
526    pub async fn handle_copy_project_entry(
527        this: Model<Self>,
528        envelope: TypedEnvelope<proto::CopyProjectEntry>,
529        mut cx: AsyncAppContext,
530    ) -> Result<proto::ProjectEntryResponse> {
531        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
532        let worktree = this.update(&mut cx, |this, cx| {
533            this.worktree_for_entry(entry_id, cx)
534                .ok_or_else(|| anyhow!("worktree not found"))
535        })??;
536        Worktree::handle_copy_entry(worktree, envelope.payload, cx).await
537    }
538
539    pub async fn handle_delete_project_entry(
540        this: Model<Self>,
541        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
542        mut cx: AsyncAppContext,
543    ) -> Result<proto::ProjectEntryResponse> {
544        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
545        let worktree = this.update(&mut cx, |this, cx| {
546            this.worktree_for_entry(entry_id, cx)
547                .ok_or_else(|| anyhow!("worktree not found"))
548        })??;
549        Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
550    }
551
552    pub async fn handle_expand_project_entry(
553        this: Model<Self>,
554        envelope: TypedEnvelope<proto::ExpandProjectEntry>,
555        mut cx: AsyncAppContext,
556    ) -> Result<proto::ExpandProjectEntryResponse> {
557        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
558        let worktree = this
559            .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
560            .ok_or_else(|| anyhow!("invalid request"))?;
561        Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
562    }
563}
564
565#[derive(Clone)]
566enum WorktreeHandle {
567    Strong(Model<Worktree>),
568    Weak(WeakModel<Worktree>),
569}
570
571impl WorktreeHandle {
572    fn upgrade(&self) -> Option<Model<Worktree>> {
573        match self {
574            WorktreeHandle::Strong(handle) => Some(handle.clone()),
575            WorktreeHandle::Weak(handle) => handle.upgrade(),
576        }
577    }
578}