Got basic replication working :)

Mikayla Maki created

Change summary

crates/collab/src/db.rs                      |   2 
crates/collab/src/tests/integration_tests.rs |  49 +++++++-
crates/fs/src/fs.rs                          |   6 
crates/fs/src/repository.rs                  |   2 
crates/project/src/worktree.rs               | 129 +++++++++++++++++++--
crates/rpc/src/proto.rs                      |  65 +++++++++-
crates/sum_tree/src/tree_map.rs              |  44 ++++++
7 files changed, 256 insertions(+), 41 deletions(-)

Detailed changes

crates/collab/src/db.rs 🔗

@@ -1570,7 +1570,6 @@ impl Database {
                                     branch: db_repository.branch,
                                     removed_worktree_repo_paths: Default::default(),
                                     updated_worktree_statuses: Default::default(),
-
                                 });
                             }
                         }
@@ -2653,7 +2652,6 @@ impl Database {
                             branch: db_repository_entry.branch,
                             removed_worktree_repo_paths: Default::default(),
                             updated_worktree_statuses: Default::default(),
-
                         });
                     }
                 }

crates/collab/src/tests/integration_tests.rs 🔗

@@ -2748,7 +2748,12 @@ async fn test_git_status_sync(
     deterministic.run_until_parked();
 
     #[track_caller]
-    fn assert_status(file: &impl AsRef<Path>, status: Option<GitFileStatus>, project: &Project, cx: &AppContext) {
+    fn assert_status(
+        file: &impl AsRef<Path>,
+        status: Option<GitFileStatus>,
+        project: &Project,
+        cx: &AppContext,
+    ) {
         let file = file.as_ref();
         let worktrees = project.visible_worktrees(cx).collect::<Vec<_>>();
         assert_eq!(worktrees.len(), 1);
@@ -2785,19 +2790,49 @@ async fn test_git_status_sync(
 
     // Smoke test status reading
     project_local.read_with(cx_a, |project, cx| {
-        assert_status(&Path::new(A_TXT), Some(GitFileStatus::Added), project, cx);
-        assert_status(&Path::new(B_TXT), Some(GitFileStatus::Added), project, cx);
+        assert_status(
+            &Path::new(A_TXT),
+            Some(GitFileStatus::Modified),
+            project,
+            cx,
+        );
+        assert_status(
+            &Path::new(B_TXT),
+            Some(GitFileStatus::Modified),
+            project,
+            cx,
+        );
     });
     project_remote.read_with(cx_b, |project, cx| {
-        assert_status(&Path::new(A_TXT), Some(GitFileStatus::Added), project, cx);
-        assert_status(&Path::new(B_TXT), Some(GitFileStatus::Added), project, cx);
+        assert_status(
+            &Path::new(A_TXT),
+            Some(GitFileStatus::Modified),
+            project,
+            cx,
+        );
+        assert_status(
+            &Path::new(B_TXT),
+            Some(GitFileStatus::Modified),
+            project,
+            cx,
+        );
     });
 
     // And synchronization while joining
     let project_remote_c = client_c.build_remote_project(project_id, cx_c).await;
     project_remote_c.read_with(cx_c, |project, cx| {
-        assert_status(&Path::new(A_TXT), Some(GitFileStatus::Added), project, cx);
-        assert_status(&Path::new(B_TXT), Some(GitFileStatus::Added), project, cx);
+        assert_status(
+            &Path::new(A_TXT),
+            Some(GitFileStatus::Modified),
+            project,
+            cx,
+        );
+        assert_status(
+            &Path::new(B_TXT),
+            Some(GitFileStatus::Modified),
+            project,
+            cx,
+        );
     });
 }
 

crates/fs/src/fs.rs 🔗

@@ -7,7 +7,7 @@ use git2::Repository as LibGitRepository;
 use lazy_static::lazy_static;
 use parking_lot::Mutex;
 use regex::Regex;
-use repository::{GitRepository, GitFileStatus};
+use repository::{GitFileStatus, GitRepository};
 use rope::Rope;
 use smol::io::{AsyncReadExt, AsyncWriteExt};
 use std::borrow::Cow;
@@ -660,9 +660,7 @@ impl FakeFs {
             state.worktree_statuses.extend(
                 statuses
                     .iter()
-                    .map(|(path, content)| {
-                        ((**path).into(), content.clone())
-                    }),
+                    .map(|(path, content)| ((**path).into(), content.clone())),
             );
         });
     }

crates/fs/src/repository.rs 🔗

@@ -194,7 +194,7 @@ pub enum GitFileStatus {
 pub struct RepoPath(PathBuf);
 
 impl RepoPath {
-    fn new(path: PathBuf) -> Self {
+    pub fn new(path: PathBuf) -> Self {
         debug_assert!(path.is_relative(), "Repo paths must be relative");
 
         RepoPath(path)

crates/project/src/worktree.rs 🔗

@@ -46,7 +46,6 @@ use std::{
     future::Future,
     mem,
     ops::{Deref, DerefMut},
-
     path::{Path, PathBuf},
     pin::Pin,
     sync::{
@@ -147,6 +146,14 @@ pub struct RepositoryEntry {
     pub(crate) worktree_statuses: TreeMap<RepoPath, GitFileStatus>,
 }
 
+fn read_git_status(git_status: i32) -> Option<GitFileStatus> {
+    proto::GitStatus::from_i32(git_status).map(|status| match status {
+        proto::GitStatus::Added => GitFileStatus::Added,
+        proto::GitStatus::Modified => GitFileStatus::Modified,
+        proto::GitStatus::Conflict => GitFileStatus::Conflict,
+    })
+}
+
 impl RepositoryEntry {
     pub fn branch(&self) -> Option<Arc<str>> {
         self.branch.clone()
@@ -172,6 +179,70 @@ impl RepositoryEntry {
             .and_then(|repo_path| self.worktree_statuses.get(&repo_path))
             .cloned()
     }
+
+    pub fn build_update(&self, other: &Self) -> proto::RepositoryEntry {
+        let mut updated_statuses: Vec<proto::StatusEntry> = Vec::new();
+        let mut removed_statuses: Vec<String> = Vec::new();
+
+        let mut self_statuses = self.worktree_statuses.iter().peekable();
+        let mut other_statuses = other.worktree_statuses.iter().peekable();
+        loop {
+            match (self_statuses.peek(), other_statuses.peek()) {
+                (Some((self_repo_path, self_status)), Some((other_repo_path, other_status))) => {
+                    match Ord::cmp(self_repo_path, other_repo_path) {
+                        Ordering::Less => {
+                            updated_statuses.push(make_status_entry(self_repo_path, self_status));
+                            self_statuses.next();
+                        }
+                        Ordering::Equal => {
+                            if self_status != other_status {
+                                updated_statuses
+                                    .push(make_status_entry(self_repo_path, self_status));
+                            }
+
+                            self_statuses.next();
+                            other_statuses.next();
+                        }
+                        Ordering::Greater => {
+                            removed_statuses.push(make_repo_path(other_repo_path));
+                            other_statuses.next();
+                        }
+                    }
+                }
+                (Some((self_repo_path, self_status)), None) => {
+                    updated_statuses.push(make_status_entry(self_repo_path, self_status));
+                    self_statuses.next();
+                }
+                (None, Some((other_repo_path, _))) => {
+                    removed_statuses.push(make_repo_path(other_repo_path));
+                    other_statuses.next();
+                }
+                (None, None) => break,
+            }
+        }
+
+        proto::RepositoryEntry {
+            work_directory_id: self.work_directory_id().to_proto(),
+            branch: self.branch.as_ref().map(|str| str.to_string()),
+            removed_worktree_repo_paths: removed_statuses,
+            updated_worktree_statuses: updated_statuses,
+        }
+    }
+}
+
+fn make_repo_path(path: &RepoPath) -> String {
+    path.as_os_str().to_string_lossy().to_string()
+}
+
+fn make_status_entry(path: &RepoPath, status: &GitFileStatus) -> proto::StatusEntry {
+    proto::StatusEntry {
+        repo_path: make_repo_path(path),
+        status: match status {
+            GitFileStatus::Added => proto::GitStatus::Added.into(),
+            GitFileStatus::Modified => proto::GitStatus::Modified.into(),
+            GitFileStatus::Conflict => proto::GitStatus::Conflict.into(),
+        },
+    }
 }
 
 impl From<&RepositoryEntry> for proto::RepositoryEntry {
@@ -179,9 +250,12 @@ impl From<&RepositoryEntry> for proto::RepositoryEntry {
         proto::RepositoryEntry {
             work_directory_id: value.work_directory.to_proto(),
             branch: value.branch.as_ref().map(|str| str.to_string()),
-            // TODO: Status
+            updated_worktree_statuses: value
+                .worktree_statuses
+                .iter()
+                .map(|(repo_path, status)| make_status_entry(repo_path, status))
+                .collect(),
             removed_worktree_repo_paths: Default::default(),
-            updated_worktree_statuses: Default::default(),
         }
     }
 }
@@ -1442,15 +1516,41 @@ impl Snapshot {
         });
 
         for repository in update.updated_repositories {
-            let repository = RepositoryEntry {
-                work_directory: ProjectEntryId::from_proto(repository.work_directory_id).into(),
-                branch: repository.branch.map(Into::into),
-                // TODO: status
-                worktree_statuses: Default::default(),
-            };
-            if let Some(entry) = self.entry_for_id(repository.work_directory_id()) {
-                self.repository_entries
-                    .insert(RepositoryWorkDirectory(entry.path.clone()), repository)
+            let work_directory_entry: WorkDirectoryEntry =
+                ProjectEntryId::from_proto(repository.work_directory_id).into();
+
+            if let Some(entry) = self.entry_for_id(*work_directory_entry) {
+                let mut statuses = TreeMap::default();
+                for status_entry in repository.updated_worktree_statuses {
+                    let Some(git_file_status) = read_git_status(status_entry.status) else {
+                        continue;
+                    };
+
+                    let repo_path = RepoPath::new(status_entry.repo_path.into());
+                    statuses.insert(repo_path, git_file_status);
+                }
+
+                let work_directory = RepositoryWorkDirectory(entry.path.clone());
+                if self.repository_entries.get(&work_directory).is_some() {
+                    self.repository_entries.update(&work_directory, |repo| {
+                        repo.branch = repository.branch.map(Into::into);
+                        repo.worktree_statuses.insert_tree(statuses);
+
+                        for repo_path in repository.removed_worktree_repo_paths {
+                            let repo_path = RepoPath::new(repo_path.into());
+                            repo.worktree_statuses.remove(&repo_path);
+                        }
+                    });
+                } else {
+                    self.repository_entries.insert(
+                        work_directory,
+                        RepositoryEntry {
+                            work_directory: work_directory_entry,
+                            branch: repository.branch.map(Into::into),
+                            worktree_statuses: statuses,
+                        },
+                    )
+                }
             } else {
                 log::error!("no work directory entry for repository {:?}", repository)
             }
@@ -1598,8 +1698,7 @@ impl LocalSnapshot {
         &self,
         path: &Path,
     ) -> Option<(&ProjectEntryId, &LocalRepositoryEntry)> {
-        self
-            .git_repositories
+        self.git_repositories
             .iter()
             .find(|(_, repo)| repo.in_dot_git(path))
     }
@@ -1691,7 +1790,7 @@ impl LocalSnapshot {
                         }
                         Ordering::Equal => {
                             if self_repo != other_repo {
-                                updated_repositories.push((*self_repo).into());
+                                updated_repositories.push(self_repo.build_update(other_repo));
                             }
 
                             self_repos.next();

crates/rpc/src/proto.rs 🔗

@@ -484,9 +484,11 @@ pub fn split_worktree_update(
     mut message: UpdateWorktree,
     max_chunk_size: usize,
 ) -> impl Iterator<Item = UpdateWorktree> {
-    let mut done = false;
+    let mut done_files = false;
+    let mut done_statuses = false;
+    let mut repository_index = 0;
     iter::from_fn(move || {
-        if done {
+        if done_files && done_statuses {
             return None;
         }
 
@@ -502,22 +504,71 @@ pub fn split_worktree_update(
             .drain(..removed_entries_chunk_size)
             .collect();
 
-        done = message.updated_entries.is_empty() && message.removed_entries.is_empty();
+        done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
 
         // Wait to send repositories until after we've guaranteed that their associated entries
         // will be read
-        let updated_repositories = if done {
-            mem::take(&mut message.updated_repositories)
+        let updated_repositories = if done_files {
+            let mut total_statuses = 0;
+            let mut updated_repositories = Vec::new();
+            while total_statuses < max_chunk_size
+                && repository_index < message.updated_repositories.len()
+            {
+                let updated_statuses_chunk_size = cmp::min(
+                    message.updated_repositories[repository_index]
+                        .updated_worktree_statuses
+                        .len(),
+                    max_chunk_size - total_statuses,
+                );
+
+                let updated_statuses: Vec<_> = message.updated_repositories[repository_index]
+                    .updated_worktree_statuses
+                    .drain(..updated_statuses_chunk_size)
+                    .collect();
+
+                total_statuses += updated_statuses.len();
+
+                let done_this_repo = message.updated_repositories[repository_index]
+                    .updated_worktree_statuses
+                    .is_empty();
+
+                let removed_repo_paths = if done_this_repo {
+                    mem::take(
+                        &mut message.updated_repositories[repository_index]
+                            .removed_worktree_repo_paths,
+                    )
+                } else {
+                    Default::default()
+                };
+
+                updated_repositories.push(RepositoryEntry {
+                    work_directory_id: message.updated_repositories[repository_index]
+                        .work_directory_id,
+                    branch: message.updated_repositories[repository_index]
+                        .branch
+                        .clone(),
+                    updated_worktree_statuses: updated_statuses,
+                    removed_worktree_repo_paths: removed_repo_paths,
+                });
+
+                if done_this_repo {
+                    repository_index += 1;
+                }
+            }
+
+            updated_repositories
         } else {
             Default::default()
         };
 
-        let removed_repositories = if done {
+        let removed_repositories = if done_files && done_statuses {
             mem::take(&mut message.removed_repositories)
         } else {
             Default::default()
         };
 
+        done_statuses = repository_index >= message.updated_repositories.len();
+
         Some(UpdateWorktree {
             project_id: message.project_id,
             worktree_id: message.worktree_id,
@@ -526,7 +577,7 @@ pub fn split_worktree_update(
             updated_entries,
             removed_entries,
             scan_id: message.scan_id,
-            is_last_update: done && message.is_last_update,
+            is_last_update: done_files && message.is_last_update,
             updated_repositories,
             removed_repositories,
         })

crates/sum_tree/src/tree_map.rs 🔗

@@ -1,6 +1,6 @@
 use std::{cmp::Ordering, fmt::Debug};
 
-use crate::{Bias, Dimension, Item, KeyedItem, SeekTarget, SumTree, Summary};
+use crate::{Bias, Dimension, Edit, Item, KeyedItem, SeekTarget, SumTree, Summary};
 
 #[derive(Clone, Debug, PartialEq, Eq)]
 pub struct TreeMap<K, V>(SumTree<MapEntry<K, V>>)
@@ -82,8 +82,7 @@ impl<K: Clone + Debug + Default + Ord, V: Clone + Debug> TreeMap<K, V> {
         cursor.item().map(|item| (&item.key, &item.value))
     }
 
-    pub fn remove_between(&mut self, from: &K, until: &K)
-    {
+    pub fn remove_between(&mut self, from: &K, until: &K) {
         let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
         let from_key = MapKeyRef(Some(from));
         let mut new_tree = cursor.slice(&from_key, Bias::Left, &());
@@ -95,7 +94,8 @@ impl<K: Clone + Debug + Default + Ord, V: Clone + Debug> TreeMap<K, V> {
     }
 
     pub fn remove_from_while<F>(&mut self, from: &K, mut f: F)
-    where F: FnMut(&K, &V) -> bool
+    where
+        F: FnMut(&K, &V) -> bool,
     {
         let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
         let from_key = MapKeyRef(Some(from));
@@ -111,7 +111,6 @@ impl<K: Clone + Debug + Default + Ord, V: Clone + Debug> TreeMap<K, V> {
         self.0 = new_tree;
     }
 
-
     pub fn update<F, T>(&mut self, key: &K, f: F) -> Option<T>
     where
         F: FnOnce(&mut V) -> T,
@@ -155,6 +154,20 @@ impl<K: Clone + Debug + Default + Ord, V: Clone + Debug> TreeMap<K, V> {
     pub fn values(&self) -> impl Iterator<Item = &V> + '_ {
         self.0.iter().map(|entry| &entry.value)
     }
+
+    pub fn insert_tree(&mut self, other: TreeMap<K, V>) {
+        let edits = other
+            .iter()
+            .map(|(key, value)| {
+                Edit::Insert(MapEntry {
+                    key: key.to_owned(),
+                    value: value.to_owned(),
+                })
+            })
+            .collect();
+
+        self.0.edit(edits, &());
+    }
 }
 
 impl<K, V> Default for TreeMap<K, V>
@@ -340,4 +353,25 @@ mod tests {
         assert_eq!(map.get(&"baaaab"), None);
         assert_eq!(map.get(&"c"), Some(&5));
     }
+
+    #[test]
+    fn test_insert_tree() {
+        let mut map = TreeMap::default();
+        map.insert("a", 1);
+        map.insert("b", 2);
+        map.insert("c", 3);
+
+        let mut other = TreeMap::default();
+        other.insert("a", 2);
+        other.insert("b", 2);
+        other.insert("d", 4);
+
+        map.insert_tree(other);
+
+        assert_eq!(map.iter().count(), 4);
+        assert_eq!(map.get(&"a"), Some(&2));
+        assert_eq!(map.get(&"b"), Some(&2));
+        assert_eq!(map.get(&"c"), Some(&3));
+        assert_eq!(map.get(&"d"), Some(&4));
+    }
 }