Split worktree updates when a peer joins an already-shared project

Antonio Scandurra created

Change summary

crates/collab/src/integration_tests.rs |   1 
crates/collab/src/rpc.rs               |  63 +++++++---
crates/project/src/project.rs          |  52 ++++----
crates/project/src/worktree.rs         | 167 ++++++---------------------
crates/rpc/proto/zed.proto             |  12 -
crates/rpc/src/proto.rs                |  26 ++++
6 files changed, 137 insertions(+), 184 deletions(-)

Detailed changes

crates/collab/src/integration_tests.rs 🔗

@@ -1472,6 +1472,7 @@ async fn test_collaborating_with_diagnostics(
 
     // Join project as client C and observe the diagnostics.
     let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
+    deterministic.run_until_parked();
     project_c.read_with(cx_c, |project, cx| {
         assert_eq!(
             project.diagnostic_summaries(cx).collect::<Vec<_>>(),

crates/collab/src/rpc.rs 🔗

@@ -791,21 +791,10 @@ impl Server {
             let worktrees = project
                 .worktrees
                 .iter()
-                .filter_map(|(id, shared_worktree)| {
-                    let worktree = project.worktrees.get(&id)?;
-                    Some(proto::Worktree {
-                        id: *id,
-                        root_name: worktree.root_name.clone(),
-                        entries: shared_worktree.entries.values().cloned().collect(),
-                        diagnostic_summaries: shared_worktree
-                            .diagnostic_summaries
-                            .values()
-                            .cloned()
-                            .collect(),
-                        visible: worktree.visible,
-                        scan_id: shared_worktree.scan_id,
-                        is_complete: worktree.is_complete,
-                    })
+                .map(|(id, worktree)| proto::WorktreeMetadata {
+                    id: *id,
+                    root_name: worktree.root_name.clone(),
+                    visible: worktree.visible,
                 })
                 .collect::<Vec<_>>();
 
@@ -841,14 +830,15 @@ impl Server {
                 }
             }
 
-            for (receipt, replica_id) in receipts_with_replica_ids {
+            // First, we send the metadata associated with each worktree.
+            for (receipt, replica_id) in &receipts_with_replica_ids {
                 self.peer.respond(
-                    receipt,
+                    receipt.clone(),
                     proto::JoinProjectResponse {
                         variant: Some(proto::join_project_response::Variant::Accept(
                             proto::join_project_response::Accept {
                                 worktrees: worktrees.clone(),
-                                replica_id: replica_id as u32,
+                                replica_id: *replica_id as u32,
                                 collaborators: collaborators.clone(),
                                 language_servers: project.language_servers.clone(),
                             },
@@ -856,6 +846,43 @@ impl Server {
                     },
                 )?;
             }
+
+            for (worktree_id, worktree) in &project.worktrees {
+                #[cfg(any(test, feature = "test-support"))]
+                const MAX_CHUNK_SIZE: usize = 2;
+                #[cfg(not(any(test, feature = "test-support")))]
+                const MAX_CHUNK_SIZE: usize = 256;
+
+                // Stream this worktree's entries.
+                let message = proto::UpdateWorktree {
+                    project_id: project_id.to_proto(),
+                    worktree_id: *worktree_id,
+                    root_name: worktree.root_name.clone(),
+                    updated_entries: worktree.entries.values().cloned().collect(),
+                    removed_entries: Default::default(),
+                    scan_id: worktree.scan_id,
+                    is_last_update: worktree.is_complete,
+                };
+                for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
+                    for (receipt, _) in &receipts_with_replica_ids {
+                        self.peer.send(receipt.sender_id, update.clone())?;
+                    }
+                }
+
+                // Stream this worktree's diagnostics.
+                for summary in worktree.diagnostic_summaries.values() {
+                    for (receipt, _) in &receipts_with_replica_ids {
+                        self.peer.send(
+                            receipt.sender_id,
+                            proto::UpdateDiagnosticSummary {
+                                project_id: project_id.to_proto(),
+                                worktree_id: *worktree_id,
+                                summary: Some(summary.clone()),
+                            },
+                        )?;
+                    }
+                }
+            }
         }
 
         self.update_user_contacts(host_user_id).await?;

crates/project/src/project.rs 🔗

@@ -487,10 +487,9 @@ impl Project {
 
         let mut worktrees = Vec::new();
         for worktree in response.worktrees {
-            let (worktree, load_task) = cx
+            let worktree = cx
                 .update(|cx| Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx));
             worktrees.push(worktree);
-            load_task.detach();
         }
 
         let (opened_buffer_tx, opened_buffer_rx) = watch::channel();
@@ -4441,19 +4440,9 @@ impl Project {
                 {
                     this.worktrees.push(WorktreeHandle::Strong(old_worktree));
                 } else {
-                    let worktree = proto::Worktree {
-                        id: worktree.id,
-                        root_name: worktree.root_name,
-                        entries: Default::default(),
-                        diagnostic_summaries: Default::default(),
-                        visible: worktree.visible,
-                        scan_id: 0,
-                        is_complete: false,
-                    };
-                    let (worktree, load_task) =
+                    let worktree =
                         Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx);
                     this.add_worktree(&worktree, cx);
-                    load_task.detach();
                 }
             }
 
@@ -4477,8 +4466,8 @@ impl Project {
             if let Some(worktree) = this.worktree_for_id(worktree_id, cx) {
                 worktree.update(cx, |worktree, _| {
                     let worktree = worktree.as_remote_mut().unwrap();
-                    worktree.update_from_remote(envelope)
-                })?;
+                    worktree.update_from_remote(envelope.payload);
+                });
             }
             Ok(())
         })
@@ -7996,7 +7985,10 @@ mod tests {
     }
 
     #[gpui::test(retries = 5)]
-    async fn test_rescan_and_remote_updates(cx: &mut gpui::TestAppContext) {
+    async fn test_rescan_and_remote_updates(
+        deterministic: Arc<Deterministic>,
+        cx: &mut gpui::TestAppContext,
+    ) {
         let dir = temp_tree(json!({
             "a": {
                 "file1": "",
@@ -8040,17 +8032,24 @@ mod tests {
         // Create a remote copy of this worktree.
         let tree = project.read_with(cx, |project, cx| project.worktrees(cx).next().unwrap());
         let initial_snapshot = tree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot());
-        let (remote, load_task) = cx.update(|cx| {
+        let remote = cx.update(|cx| {
             Worktree::remote(
                 1,
                 1,
-                initial_snapshot.to_proto(&Default::default(), true),
+                proto::WorktreeMetadata {
+                    id: initial_snapshot.id().to_proto(),
+                    root_name: initial_snapshot.root_name().into(),
+                    visible: true,
+                },
                 rpc.clone(),
                 cx,
             )
         });
-        // tree
-        load_task.await;
+        remote.update(cx, |remote, _| {
+            let update = initial_snapshot.build_initial_update(1);
+            remote.as_remote_mut().unwrap().update_from_remote(update);
+        });
+        deterministic.run_until_parked();
 
         cx.read(|cx| {
             assert!(!buffer2.read(cx).is_dirty());
@@ -8116,19 +8115,16 @@ mod tests {
         // Update the remote worktree. Check that it becomes consistent with the
         // local worktree.
         remote.update(cx, |remote, cx| {
-            let update_message = tree.read(cx).as_local().unwrap().snapshot().build_update(
+            let update = tree.read(cx).as_local().unwrap().snapshot().build_update(
                 &initial_snapshot,
                 1,
                 1,
                 true,
             );
-            remote
-                .as_remote_mut()
-                .unwrap()
-                .snapshot
-                .apply_remote_update(update_message)
-                .unwrap();
-
+            remote.as_remote_mut().unwrap().update_from_remote(update);
+        });
+        deterministic.run_until_parked();
+        remote.read_with(cx, |remote, _| {
             assert_eq!(
                 remote
                     .paths()

crates/project/src/worktree.rs 🔗

@@ -7,7 +7,7 @@ use super::{
 };
 use ::ignore::gitignore::{Gitignore, GitignoreBuilder};
 use anyhow::{anyhow, Context, Result};
-use client::{proto, Client, TypedEnvelope};
+use client::{proto, Client};
 use clock::ReplicaId;
 use collections::HashMap;
 use futures::{
@@ -40,7 +40,6 @@ use std::{
     ffi::{OsStr, OsString},
     fmt,
     future::Future,
-    mem,
     ops::{Deref, DerefMut},
     os::unix::prelude::{OsStrExt, OsStringExt},
     path::{Path, PathBuf},
@@ -173,10 +172,10 @@ impl Worktree {
     pub fn remote(
         project_remote_id: u64,
         replica_id: ReplicaId,
-        worktree: proto::Worktree,
+        worktree: proto::WorktreeMetadata,
         client: Arc<Client>,
         cx: &mut MutableAppContext,
-    ) -> (ModelHandle<Self>, Task<()>) {
+    ) -> ModelHandle<Self> {
         let remote_id = worktree.id;
         let root_char_bag: CharBag = worktree
             .root_name
@@ -191,8 +190,8 @@ impl Worktree {
             root_char_bag,
             entries_by_path: Default::default(),
             entries_by_id: Default::default(),
-            scan_id: worktree.scan_id as usize,
-            is_complete: worktree.is_complete,
+            scan_id: 0,
+            is_complete: false,
         };
 
         let (updates_tx, mut updates_rx) = mpsc::unbounded();
@@ -207,90 +206,37 @@ impl Worktree {
                 updates_tx: Some(updates_tx),
                 snapshot_updated_rx: snapshot_updated_rx.clone(),
                 client: client.clone(),
-                diagnostic_summaries: TreeMap::from_ordered_entries(
-                    worktree.diagnostic_summaries.into_iter().map(|summary| {
-                        (
-                            PathKey(PathBuf::from(summary.path).into()),
-                            DiagnosticSummary {
-                                language_server_id: summary.language_server_id as usize,
-                                error_count: summary.error_count as usize,
-                                warning_count: summary.warning_count as usize,
-                            },
-                        )
-                    }),
-                ),
+                diagnostic_summaries: Default::default(),
                 visible,
             })
         });
 
-        let deserialize_task = cx.spawn({
-            let worktree_handle = worktree_handle.clone();
-            |cx| async move {
-                let (entries_by_path, entries_by_id) = cx
-                    .background()
-                    .spawn(async move {
-                        let mut entries_by_path_edits = Vec::new();
-                        let mut entries_by_id_edits = Vec::new();
-                        for entry in worktree.entries {
-                            match Entry::try_from((&root_char_bag, entry)) {
-                                Ok(entry) => {
-                                    entries_by_id_edits.push(Edit::Insert(PathEntry {
-                                        id: entry.id,
-                                        path: entry.path.clone(),
-                                        is_ignored: entry.is_ignored,
-                                        scan_id: 0,
-                                    }));
-                                    entries_by_path_edits.push(Edit::Insert(entry));
-                                }
-                                Err(err) => log::warn!("error for remote worktree entry {:?}", err),
-                            }
-                        }
-
-                        let mut entries_by_path = SumTree::new();
-                        let mut entries_by_id = SumTree::new();
-                        entries_by_path.edit(entries_by_path_edits, &());
-                        entries_by_id.edit(entries_by_id_edits, &());
-
-                        (entries_by_path, entries_by_id)
-                    })
-                    .await;
-
-                {
-                    let mut snapshot = background_snapshot.lock();
-                    snapshot.entries_by_path = entries_by_path;
-                    snapshot.entries_by_id = entries_by_id;
+        cx.background()
+            .spawn(async move {
+                while let Some(update) = updates_rx.next().await {
+                    if let Err(error) = background_snapshot.lock().apply_remote_update(update) {
+                        log::error!("error applying worktree update: {}", error);
+                    }
                     snapshot_updated_tx.send(()).await.ok();
                 }
+            })
+            .detach();
 
-                cx.background()
-                    .spawn(async move {
-                        while let Some(update) = updates_rx.next().await {
-                            if let Err(error) =
-                                background_snapshot.lock().apply_remote_update(update)
-                            {
-                                log::error!("error applying worktree update: {}", error);
-                            }
-                            snapshot_updated_tx.send(()).await.ok();
-                        }
-                    })
-                    .detach();
-
-                cx.spawn(|mut cx| {
-                    let this = worktree_handle.downgrade();
-                    async move {
-                        while let Some(_) = snapshot_updated_rx.recv().await {
-                            if let Some(this) = this.upgrade(&cx) {
-                                this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
-                            } else {
-                                break;
-                            }
-                        }
+        cx.spawn(|mut cx| {
+            let this = worktree_handle.downgrade();
+            async move {
+                while let Some(_) = snapshot_updated_rx.recv().await {
+                    if let Some(this) = this.upgrade(&cx) {
+                        this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
+                    } else {
+                        break;
                     }
-                })
-                .detach();
+                }
             }
-        });
-        (worktree_handle, deserialize_task)
+        })
+        .detach();
+
+        worktree_handle
     }
 
     pub fn as_local(&self) -> Option<&LocalWorktree> {
@@ -1015,16 +961,12 @@ impl RemoteWorktree {
         self.updates_tx.take();
     }
 
-    pub fn update_from_remote(
-        &mut self,
-        envelope: TypedEnvelope<proto::UpdateWorktree>,
-    ) -> Result<()> {
+    pub fn update_from_remote(&mut self, update: proto::UpdateWorktree) {
         if let Some(updates_tx) = &self.updates_tx {
             updates_tx
-                .unbounded_send(envelope.payload)
+                .unbounded_send(update)
                 .expect("consumer runs to completion");
         }
-        Ok(())
     }
 
     fn wait_for_snapshot(
@@ -1162,7 +1104,7 @@ impl Snapshot {
         for entry_id in update.removed_entries {
             let entry = self
                 .entry_for_id(ProjectEntryId::from_proto(entry_id))
-                .ok_or_else(|| anyhow!("unknown entry"))?;
+                .ok_or_else(|| anyhow!("unknown entry {}", entry_id))?;
             entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
             entries_by_id_edits.push(Edit::Remove(entry.id));
         }
@@ -1306,28 +1248,16 @@ impl LocalSnapshot {
     }
 
     #[cfg(test)]
-    pub(crate) fn to_proto(
-        &self,
-        diagnostic_summaries: &TreeMap<PathKey, DiagnosticSummary>,
-        visible: bool,
-    ) -> proto::Worktree {
+    pub(crate) fn build_initial_update(&self, project_id: u64) -> proto::UpdateWorktree {
         let root_name = self.root_name.clone();
-        proto::Worktree {
-            id: self.id.0 as u64,
+        proto::UpdateWorktree {
+            project_id,
+            worktree_id: self.id().to_proto(),
             root_name,
-            entries: self
-                .entries_by_path
-                .iter()
-                .filter(|e| !e.is_ignored)
-                .map(Into::into)
-                .collect(),
-            diagnostic_summaries: diagnostic_summaries
-                .iter()
-                .map(|(path, summary)| summary.to_proto(&path.0))
-                .collect(),
-            visible,
+            updated_entries: self.entries_by_path.iter().map(Into::into).collect(),
+            removed_entries: Default::default(),
             scan_id: self.scan_id as u64,
-            is_complete: true,
+            is_last_update: true,
         }
     }
 
@@ -2709,31 +2639,14 @@ impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
     }
 }
 
-async fn send_worktree_update(
-    client: &Arc<Client>,
-    mut update: proto::UpdateWorktree,
-) -> Result<()> {
+async fn send_worktree_update(client: &Arc<Client>, update: proto::UpdateWorktree) -> Result<()> {
     #[cfg(any(test, feature = "test-support"))]
     const MAX_CHUNK_SIZE: usize = 2;
     #[cfg(not(any(test, feature = "test-support")))]
     const MAX_CHUNK_SIZE: usize = 256;
 
-    let mut is_last_update = false;
-    while !is_last_update {
-        let chunk_size = cmp::min(update.updated_entries.len(), MAX_CHUNK_SIZE);
-        let updated_entries = update.updated_entries.drain(..chunk_size).collect();
-        is_last_update = update.updated_entries.is_empty();
-        client
-            .request(proto::UpdateWorktree {
-                project_id: update.project_id,
-                worktree_id: update.worktree_id,
-                root_name: update.root_name.clone(),
-                updated_entries,
-                removed_entries: mem::take(&mut update.removed_entries),
-                scan_id: update.scan_id,
-                is_last_update,
-            })
-            .await?;
+    for update in proto::split_worktree_update(update, MAX_CHUNK_SIZE) {
+        client.request(update).await?;
     }
 
     Ok(())

crates/rpc/proto/zed.proto 🔗

@@ -168,7 +168,7 @@ message JoinProjectResponse {
 
     message Accept {
         uint32 replica_id = 1;
-        repeated Worktree worktrees = 2;
+        repeated WorktreeMetadata worktrees = 2;
         repeated Collaborator collaborators = 3;
         repeated LanguageServer language_servers = 4;        
     }
@@ -766,16 +766,6 @@ message User {
     string avatar_url = 3;
 }
 
-message Worktree {
-    uint64 id = 1;
-    string root_name = 2;
-    repeated Entry entries = 3;
-    repeated DiagnosticSummary diagnostic_summaries = 4;
-    bool visible = 5;
-    uint64 scan_id = 6;
-    bool is_complete = 7;
-}
-
 message File {
     uint64 worktree_id = 1;
     optional uint64 entry_id = 2;

crates/rpc/src/proto.rs 🔗

@@ -5,6 +5,7 @@ use futures::{SinkExt as _, StreamExt as _};
 use prost::Message as _;
 use serde::Serialize;
 use std::any::{Any, TypeId};
+use std::{cmp, iter, mem};
 use std::{
     fmt::Debug,
     io,
@@ -390,6 +391,31 @@ impl From<Nonce> for u128 {
     }
 }
 
+pub fn split_worktree_update(
+    mut message: UpdateWorktree,
+    max_chunk_size: usize,
+) -> impl Iterator<Item = UpdateWorktree> {
+    let mut done = false;
+    iter::from_fn(move || {
+        if done {
+            return None;
+        }
+
+        let chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
+        let updated_entries = message.updated_entries.drain(..chunk_size).collect();
+        done = message.updated_entries.is_empty();
+        Some(UpdateWorktree {
+            project_id: message.project_id,
+            worktree_id: message.worktree_id,
+            root_name: message.root_name.clone(),
+            updated_entries,
+            removed_entries: mem::take(&mut message.removed_entries),
+            scan_id: message.scan_id,
+            is_last_update: done && message.is_last_update,
+        })
+    })
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;