Remove ShareWorktree message

Nathan Sobo , Max Brunsfeld , and Antonio Scandurra created

Instead, create an empty worktree on guests when a worktree is first *registered*, then update it via an initial UpdateWorktree message.

This prevents the host from referencing a worktree in definition RPC responses that hasn't yet been observed by the guest. We could have waited until the entire worktree was shared, but this could take a long time, so instead we create an empty one on guests and proceed from there.

We still have randomized test failures as of this commit:

SEED=9519 MAX_PEERS=2 ITERATIONS=10000 OPERATIONS=7 ct -p zed-server test_random_collaboration

Co-Authored-By: Max Brunsfeld <maxbrunsfeld@gmail.com>
Co-Authored-By: Antonio Scandurra <me@as-cii.com>

Change summary

crates/project/src/project.rs  | 19 +++++++-----
crates/project/src/worktree.rs | 18 +++++++++--
crates/rpc/src/proto.rs        |  1 
crates/server/src/rpc.rs       | 25 +++++++---------
crates/server/src/rpc/store.rs | 55 +++++++++++++++++------------------
5 files changed, 63 insertions(+), 55 deletions(-)

Detailed changes

crates/project/src/project.rs 🔗

@@ -169,7 +169,7 @@ impl DiagnosticSummary {
         this
     }
 
-    pub fn to_proto(&self, path: Arc<Path>) -> proto::DiagnosticSummary {
+    pub fn to_proto(&self, path: &Path) -> proto::DiagnosticSummary {
         proto::DiagnosticSummary {
             path: path.to_string_lossy().to_string(),
             error_count: self.error_count as u32,
@@ -195,7 +195,7 @@ impl Project {
         client.add_entity_message_handler(Self::handle_disk_based_diagnostics_updated);
         client.add_entity_message_handler(Self::handle_disk_based_diagnostics_updating);
         client.add_entity_message_handler(Self::handle_remove_collaborator);
-        client.add_entity_message_handler(Self::handle_share_worktree);
+        client.add_entity_message_handler(Self::handle_register_worktree);
         client.add_entity_message_handler(Self::handle_unregister_worktree);
         client.add_entity_message_handler(Self::handle_unshare_project);
         client.add_entity_message_handler(Self::handle_update_buffer_file);
@@ -2347,19 +2347,22 @@ impl Project {
         })
     }
 
-    async fn handle_share_worktree(
+    async fn handle_register_worktree(
         this: ModelHandle<Self>,
-        envelope: TypedEnvelope<proto::ShareWorktree>,
+        envelope: TypedEnvelope<proto::RegisterWorktree>,
         client: Arc<Client>,
         mut cx: AsyncAppContext,
     ) -> Result<()> {
         this.update(&mut cx, |this, cx| {
             let remote_id = this.remote_id().ok_or_else(|| anyhow!("invalid project"))?;
             let replica_id = this.replica_id();
-            let worktree = envelope
-                .payload
-                .worktree
-                .ok_or_else(|| anyhow!("invalid worktree"))?;
+            let worktree = proto::Worktree {
+                id: envelope.payload.worktree_id,
+                root_name: envelope.payload.root_name,
+                entries: Default::default(),
+                diagnostic_summaries: Default::default(),
+                weak: envelope.payload.weak,
+            };
             let (worktree, load_task) =
                 Worktree::remote(remote_id, replica_id, worktree, client, cx);
             this.add_worktree(&worktree, cx);

crates/project/src/worktree.rs 🔗

@@ -771,12 +771,10 @@ impl LocalWorktree {
             let worktree_id = cx.model_id() as u64;
             let (snapshots_to_send_tx, snapshots_to_send_rx) =
                 smol::channel::unbounded::<LocalSnapshot>();
-            let (mut share_tx, mut share_rx) = oneshot::channel();
             let maintain_remote_snapshot = cx.background().spawn({
                 let rpc = rpc.clone();
                 let snapshot = snapshot.clone();
                 let diagnostic_summaries = self.diagnostic_summaries.clone();
-                let weak = self.weak;
                 async move {
                     if let Err(error) = rpc
                         .request(proto::UpdateWorktree {
@@ -799,6 +797,14 @@ impl LocalWorktree {
                         let _ = share_tx.try_send(Ok(()));
                     }
 
+                    for (path, summary) in diagnostic_summaries.iter() {
+                        rpc.send(proto::UpdateDiagnosticSummary {
+                            project_id,
+                            worktree_id,
+                            summary: Some(summary.to_proto(&path.0)),
+                        })?;
+                    }
+
                     let mut prev_snapshot = snapshot;
                     while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
                         let message =
@@ -819,7 +825,10 @@ impl LocalWorktree {
         }
 
         async move {
-            share_rx.next().await;
+            share_rx
+                .next()
+                .await
+                .unwrap_or_else(|| Err(anyhow!("share ended")))
         }
     }
 
@@ -1014,6 +1023,7 @@ impl Snapshot {
 }
 
 impl LocalSnapshot {
+    #[cfg(test)]
     pub(crate) fn to_proto(
         &self,
         diagnostic_summaries: &TreeMap<PathKey, DiagnosticSummary>,
@@ -1031,7 +1041,7 @@ impl LocalSnapshot {
                 .collect(),
             diagnostic_summaries: diagnostic_summaries
                 .iter()
-                .map(|(path, summary)| summary.to_proto(path.0.clone()))
+                .map(|(path, summary)| summary.to_proto(&path.0))
                 .collect(),
             weak,
         }

crates/rpc/src/proto.rs 🔗

@@ -262,6 +262,7 @@ entity_messages!(
     UpdateBuffer,
     UpdateBufferFile,
     UpdateDiagnosticSummary,
+    RegisterWorktree,
     UpdateWorktree,
 );
 

crates/server/src/rpc.rs 🔗

@@ -334,16 +334,16 @@ impl Server {
                     replica_id: 0,
                     user_id: joined.project.host_user_id.to_proto(),
                 });
-                let worktrees = joined
-                    .project
+                let worktrees = share
                     .worktrees
                     .iter()
-                    .filter_map(|(id, worktree)| {
-                        worktree.share.as_ref().map(|share| proto::Worktree {
+                    .filter_map(|(id, shared_worktree)| {
+                        let worktree = joined.project.worktrees.get(&id)?;
+                        Some(proto::Worktree {
                             id: *id,
                             root_name: worktree.root_name.clone(),
-                            entries: share.entries.values().cloned().collect(),
-                            diagnostic_summaries: share
+                            entries: shared_worktree.entries.values().cloned().collect(),
+                            diagnostic_summaries: shared_worktree
                                 .diagnostic_summaries
                                 .values()
                                 .cloned()
@@ -437,7 +437,6 @@ impl Server {
                 Worktree {
                     authorized_user_ids: contact_user_ids.clone(),
                     root_name: request.payload.root_name.clone(),
-                    share: None,
                     weak: request.payload.weak,
                 },
             )?;
@@ -1164,7 +1163,7 @@ mod tests {
         cell::Cell,
         env,
         ops::Deref,
-        path::Path,
+        path::{Path, PathBuf},
         rc::Rc,
         sync::{
             atomic::{AtomicBool, Ordering::SeqCst},
@@ -2115,16 +2114,14 @@ mod tests {
                 let worktree = store
                     .project(project_id)
                     .unwrap()
+                    .share
+                    .as_ref()
+                    .unwrap()
                     .worktrees
                     .get(&worktree_id.to_proto())
                     .unwrap();
 
-                !worktree
-                    .share
-                    .as_ref()
-                    .unwrap()
-                    .diagnostic_summaries
-                    .is_empty()
+                !worktree.diagnostic_summaries.is_empty()
             })
             .await;
 

crates/server/src/rpc/store.rs 🔗

@@ -30,7 +30,6 @@ pub struct Project {
 pub struct Worktree {
     pub authorized_user_ids: Vec<UserId>,
     pub root_name: String,
-    pub share: Option<WorktreeShare>,
     pub weak: bool,
 }
 
@@ -38,8 +37,10 @@ pub struct Worktree {
 pub struct ProjectShare {
     pub guests: HashMap<ConnectionId, (ReplicaId, UserId)>,
     pub active_replica_ids: HashSet<ReplicaId>,
+    pub worktrees: HashMap<u64, WorktreeShare>,
 }
 
+#[derive(Default)]
 pub struct WorktreeShare {
     pub entries: HashMap<u64, proto::Entry>,
     pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
@@ -74,11 +75,6 @@ pub struct LeftProject {
     pub authorized_user_ids: Vec<UserId>,
 }
 
-pub struct SharedWorktree {
-    pub authorized_user_ids: Vec<UserId>,
-    pub connection_ids: Vec<ConnectionId>,
-}
-
 impl Store {
     pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId) {
         self.connections.insert(
@@ -272,6 +268,9 @@ impl Store {
                 connection.projects.insert(project_id);
             }
             project.worktrees.insert(worktree_id, worktree);
+            if let Ok(share) = project.share_mut() {
+                share.worktrees.insert(worktree_id, Default::default());
+            }
 
             #[cfg(test)]
             self.check_invariants();
@@ -326,8 +325,9 @@ impl Store {
             .ok_or_else(|| anyhow!("no such worktree"))?;
 
         let mut guest_connection_ids = Vec::new();
-        if let Some(share) = &project.share {
+        if let Ok(share) = project.share_mut() {
             guest_connection_ids.extend(share.guests.keys());
+            share.worktrees.remove(&worktree_id);
         }
 
         for authorized_user_id in &worktree.authorized_user_ids {
@@ -349,7 +349,11 @@ impl Store {
     pub fn share_project(&mut self, project_id: u64, connection_id: ConnectionId) -> bool {
         if let Some(project) = self.projects.get_mut(&project_id) {
             if project.host_connection_id == connection_id {
-                project.share = Some(ProjectShare::default());
+                let mut share = ProjectShare::default();
+                for worktree_id in project.worktrees.keys() {
+                    share.worktrees.insert(*worktree_id, Default::default());
+                }
+                project.share = Some(share);
                 return true;
             }
         }
@@ -380,10 +384,6 @@ impl Store {
                 }
             }
 
-            for worktree in project.worktrees.values_mut() {
-                worktree.share.take();
-            }
-
             #[cfg(test)]
             self.check_invariants();
 
@@ -407,17 +407,16 @@ impl Store {
             .projects
             .get_mut(&project_id)
             .ok_or_else(|| anyhow!("no such project"))?;
-        let worktree = project
-            .worktrees
-            .get_mut(&worktree_id)
-            .ok_or_else(|| anyhow!("no such worktree"))?;
         if project.host_connection_id == connection_id {
-            if let Some(share) = worktree.share.as_mut() {
-                share
-                    .diagnostic_summaries
-                    .insert(summary.path.clone().into(), summary);
-                return Ok(project.connection_ids());
-            }
+            let worktree = project
+                .share_mut()?
+                .worktrees
+                .get_mut(&worktree_id)
+                .ok_or_else(|| anyhow!("no such worktree"))?;
+            worktree
+                .diagnostic_summaries
+                .insert(summary.path.clone().into(), summary);
+            return Ok(project.connection_ids());
         }
 
         Err(anyhow!("no such worktree"))?
@@ -508,18 +507,16 @@ impl Store {
         updated_entries: &[proto::Entry],
     ) -> tide::Result<Vec<ConnectionId>> {
         let project = self.write_project(project_id, connection_id)?;
-        let share = project
+        let worktree = project
+            .share_mut()?
             .worktrees
             .get_mut(&worktree_id)
-            .ok_or_else(|| anyhow!("no such worktree"))?
-            .share
-            .as_mut()
-            .ok_or_else(|| anyhow!("worktree is not shared"))?;
+            .ok_or_else(|| anyhow!("no such worktree"))?;
         for entry_id in removed_entries {
-            share.entries.remove(&entry_id);
+            worktree.entries.remove(&entry_id);
         }
         for entry in updated_entries {
-            share.entries.insert(entry.id, entry.clone());
+            worktree.entries.insert(entry.id, entry.clone());
         }
         Ok(project.connection_ids())
     }