Share new worktrees when resharing project

Antonio Scandurra and Nathan Sobo created

Co-Authored-By: Nathan Sobo <nathan@zed.dev>

Change summary

crates/call/src/room.rs                                 |  2 
crates/collab/src/rpc.rs                                | 63 +++++-----
crates/collab/src/tests/randomized_integration_tests.rs |  5 
crates/project/src/project.rs                           | 30 +---
crates/project/src/worktree.rs                          | 24 +--
5 files changed, 54 insertions(+), 70 deletions(-)

Detailed changes

crates/call/src/room.rs 🔗

@@ -66,7 +66,7 @@ impl Entity for Room {
     fn release(&mut self, _: &mut MutableAppContext) {
         if self.status.is_online() {
             log::info!("room was released, sending leave message");
-            self.client.send(proto::LeaveRoom {}).log_err();
+            let _ = self.client.send(proto::LeaveRoom {});
         }
     }
 }

crates/collab/src/rpc.rs 🔗

@@ -672,15 +672,17 @@ impl<'a> Drop for ConnectionPoolGuard<'a> {
 }
 
 fn broadcast<F>(
-    sender_id: ConnectionId,
+    sender_id: Option<ConnectionId>,
     receiver_ids: impl IntoIterator<Item = ConnectionId>,
     mut f: F,
 ) where
     F: FnMut(ConnectionId) -> anyhow::Result<()>,
 {
     for receiver_id in receiver_ids {
-        if receiver_id != sender_id {
-            f(receiver_id).trace_err();
+        if Some(receiver_id) != sender_id {
+            if let Err(error) = f(receiver_id) {
+                tracing::error!("failed to send to {:?} {}", receiver_id, error);
+            }
         }
     }
 }
@@ -998,7 +1000,7 @@ async fn rejoin_room(
             }
 
             broadcast(
-                session.connection_id,
+                Some(session.connection_id),
                 project
                     .collaborators
                     .iter()
@@ -1279,7 +1281,7 @@ async fn unshare_project(message: proto::UnshareProject, session: Session) -> Re
         .await?;
 
     broadcast(
-        session.connection_id,
+        Some(session.connection_id),
         guest_connection_ids.iter().copied(),
         |conn_id| session.peer.send(conn_id, message.clone()),
     );
@@ -1430,7 +1432,7 @@ async fn update_project(
         .update_project(project_id, session.connection_id, &request.worktrees)
         .await?;
     broadcast(
-        session.connection_id,
+        Some(session.connection_id),
         guest_connection_ids.iter().copied(),
         |connection_id| {
             session
@@ -1456,7 +1458,7 @@ async fn update_worktree(
         .await?;
 
     broadcast(
-        session.connection_id,
+        Some(session.connection_id),
         guest_connection_ids.iter().copied(),
         |connection_id| {
             session
@@ -1479,7 +1481,7 @@ async fn update_diagnostic_summary(
         .await?;
 
     broadcast(
-        session.connection_id,
+        Some(session.connection_id),
         guest_connection_ids.iter().copied(),
         |connection_id| {
             session
@@ -1502,7 +1504,7 @@ async fn start_language_server(
         .await?;
 
     broadcast(
-        session.connection_id,
+        Some(session.connection_id),
         guest_connection_ids.iter().copied(),
         |connection_id| {
             session
@@ -1525,7 +1527,7 @@ async fn update_language_server(
         .project_connection_ids(project_id, session.connection_id)
         .await?;
     broadcast(
-        session.connection_id,
+        Some(session.connection_id),
         project_connection_ids.iter().copied(),
         |connection_id| {
             session
@@ -1600,11 +1602,15 @@ async fn save_buffer(
     let project_connection_ids = collaborators
         .iter()
         .map(|collaborator| collaborator.connection_id);
-    broadcast(host_connection_id, project_connection_ids, |conn_id| {
-        session
-            .peer
-            .forward_send(host_connection_id, conn_id, response_payload.clone())
-    });
+    broadcast(
+        Some(host_connection_id),
+        project_connection_ids,
+        |conn_id| {
+            session
+                .peer
+                .forward_send(host_connection_id, conn_id, response_payload.clone())
+        },
+    );
     response.send(response_payload)?;
     Ok(())
 }
@@ -1637,7 +1643,7 @@ async fn update_buffer(
     session.executor.record_backtrace();
 
     broadcast(
-        session.connection_id,
+        Some(session.connection_id),
         project_connection_ids.iter().copied(),
         |connection_id| {
             session
@@ -1658,7 +1664,7 @@ async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session)
         .await?;
 
     broadcast(
-        session.connection_id,
+        Some(session.connection_id),
         project_connection_ids.iter().copied(),
         |connection_id| {
             session
@@ -1677,7 +1683,7 @@ async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Re
         .project_connection_ids(project_id, session.connection_id)
         .await?;
     broadcast(
-        session.connection_id,
+        Some(session.connection_id),
         project_connection_ids.iter().copied(),
         |connection_id| {
             session
@@ -1696,7 +1702,7 @@ async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<(
         .project_connection_ids(project_id, session.connection_id)
         .await?;
     broadcast(
-        session.connection_id,
+        Some(session.connection_id),
         project_connection_ids.iter().copied(),
         |connection_id| {
             session
@@ -1988,7 +1994,7 @@ async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> R
         .project_connection_ids(project_id, session.connection_id)
         .await?;
     broadcast(
-        session.connection_id,
+        Some(session.connection_id),
         project_connection_ids.iter().copied(),
         |connection_id| {
             session
@@ -2098,21 +2104,20 @@ fn contact_for_user(
 }
 
 fn room_updated(room: &proto::Room, peer: &Peer) {
-    for participant in &room.participants {
-        if let Some(peer_id) = participant
-            .peer_id
-            .ok_or_else(|| anyhow!("invalid participant peer id"))
-            .trace_err()
-        {
+    broadcast(
+        None,
+        room.participants
+            .iter()
+            .filter_map(|participant| Some(participant.peer_id?.into())),
+        |peer_id| {
             peer.send(
                 peer_id.into(),
                 proto::RoomUpdated {
                     room: Some(room.clone()),
                 },
             )
-            .trace_err();
-        }
-    }
+        },
+    );
 }
 
 async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {

crates/collab/src/tests/randomized_integration_tests.rs 🔗

@@ -290,10 +290,11 @@ async fn test_random_collaboration(
                             assert_eq!(
                                 guest_snapshot.entries(false).collect::<Vec<_>>(),
                                 host_snapshot.entries(false).collect::<Vec<_>>(),
-                                "{} has different snapshot than the host for worktree {} ({:?})",
+                                "{} has different snapshot than the host for worktree {} ({:?}) and project {:?}",
                                 client.username,
                                 id,
-                                host_snapshot.abs_path()
+                                host_snapshot.abs_path(),
+                                host_project.read_with(host_cx, |project, _| project.remote_id())
                             );
                             assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
                         }

crates/project/src/project.rs 🔗

@@ -1037,7 +1037,7 @@ impl Project {
                     if update_project.is_ok() {
                         for worktree in worktrees {
                             worktree.update(&mut cx, |worktree, cx| {
-                                let worktree = &mut worktree.as_local_mut().unwrap();
+                                let worktree = worktree.as_local_mut().unwrap();
                                 worktree.share(project_id, cx).detach_and_log_err(cx)
                             });
                         }
@@ -1062,17 +1062,7 @@ impl Project {
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
         self.set_collaborators_from_proto(message.collaborators, cx)?;
-        for worktree in self.worktrees.iter() {
-            if let Some(worktree) = worktree.upgrade(&cx) {
-                worktree.update(cx, |worktree, _| {
-                    if let Some(worktree) = worktree.as_local_mut() {
-                        worktree.reshare()
-                    } else {
-                        Ok(())
-                    }
-                })?;
-            }
-        }
+        let _ = self.metadata_changed(cx);
         Ok(())
     }
 
@@ -6259,18 +6249,14 @@ impl Entity for Project {
     fn release(&mut self, _: &mut gpui::MutableAppContext) {
         match &self.client_state {
             Some(ProjectClientState::Local { remote_id, .. }) => {
-                self.client
-                    .send(proto::UnshareProject {
-                        project_id: *remote_id,
-                    })
-                    .log_err();
+                let _ = self.client.send(proto::UnshareProject {
+                    project_id: *remote_id,
+                });
             }
             Some(ProjectClientState::Remote { remote_id, .. }) => {
-                self.client
-                    .send(proto::LeaveProject {
-                        project_id: *remote_id,
-                    })
-                    .log_err();
+                let _ = self.client.send(proto::LeaveProject {
+                    project_id: *remote_id,
+                });
             }
             _ => {}
         }

crates/project/src/worktree.rs 🔗

@@ -166,7 +166,7 @@ enum ScanState {
 struct ShareState {
     project_id: u64,
     snapshots_tx: watch::Sender<LocalSnapshot>,
-    reshared: watch::Sender<()>,
+    resume_updates: watch::Sender<()>,
     _maintain_remote_snapshot: Task<Option<()>>,
 }
 
@@ -975,12 +975,12 @@ impl LocalWorktree {
     pub fn share(&mut self, project_id: u64, cx: &mut ModelContext<Worktree>) -> Task<Result<()>> {
         let (share_tx, share_rx) = oneshot::channel();
 
-        if self.share.is_some() {
+        if let Some(share) = self.share.as_mut() {
             let _ = share_tx.send(());
+            *share.resume_updates.borrow_mut() = ();
         } else {
             let (snapshots_tx, mut snapshots_rx) = watch::channel_with(self.snapshot());
-            let (reshared_tx, mut reshared_rx) = watch::channel();
-            let _ = reshared_rx.try_recv();
+            let (resume_updates_tx, mut resume_updates_rx) = watch::channel();
             let worktree_id = cx.model_id() as u64;
 
             for (path, summary) in self.diagnostic_summaries.iter() {
@@ -1022,10 +1022,11 @@ impl LocalWorktree {
                         let update =
                             snapshot.build_update(&prev_snapshot, project_id, worktree_id, true);
                         for update in proto::split_worktree_update(update, MAX_CHUNK_SIZE) {
+                            let _ = resume_updates_rx.try_recv();
                             while let Err(error) = client.request(update.clone()).await {
                                 log::error!("failed to send worktree update: {}", error);
-                                log::info!("waiting for worktree to be reshared");
-                                if reshared_rx.next().await.is_none() {
+                                log::info!("waiting to resume updates");
+                                if resume_updates_rx.next().await.is_none() {
                                     return Ok(());
                                 }
                             }
@@ -1046,7 +1047,7 @@ impl LocalWorktree {
             self.share = Some(ShareState {
                 project_id,
                 snapshots_tx,
-                reshared: reshared_tx,
+                resume_updates: resume_updates_tx,
                 _maintain_remote_snapshot,
             });
         }
@@ -1059,15 +1060,6 @@ impl LocalWorktree {
         self.share.take();
     }
 
-    pub fn reshare(&mut self) -> Result<()> {
-        let share = self
-            .share
-            .as_mut()
-            .ok_or_else(|| anyhow!("can't reshare a worktree that wasn't shared"))?;
-        *share.reshared.borrow_mut() = ();
-        Ok(())
-    }
-
     pub fn is_shared(&self) -> bool {
         self.share.is_some()
     }