WIP

Antonio Scandurra and Nathan Sobo created

Co-Authored-By: Nathan Sobo <nathan@zed.dev>

Change summary

server/src/rpc.rs    | 83 +++++++++++++++++++++++++--------------------
zed/src/presence.rs  |  6 +-
zrpc/proto/zed.proto | 12 ++----
zrpc/src/proto.rs    |  4 -
4 files changed, 54 insertions(+), 51 deletions(-)

Detailed changes

server/src/rpc.rs 🔗

@@ -114,8 +114,7 @@ impl Server {
             .add_handler(Server::join_channel)
             .add_handler(Server::leave_channel)
             .add_handler(Server::send_channel_message)
-            .add_handler(Server::get_channel_messages)
-            .add_handler(Server::get_collaborators);
+            .add_handler(Server::get_channel_messages);
 
         Arc::new(server)
     }
@@ -307,7 +306,7 @@ impl Server {
         let mut state = self.state.write().await;
         let worktree_id = state.add_worktree(Worktree {
             host_connection_id: request.sender_id,
-            collaborator_user_ids,
+            collaborator_user_ids: collaborator_user_ids.clone(),
             root_name: request.payload.root_name,
             share: None,
         });
@@ -315,6 +314,8 @@ impl Server {
         self.peer
             .respond(receipt, proto::OpenWorktreeResponse { worktree_id })
             .await?;
+        self.update_collaborators(&collaborator_user_ids).await?;
+
         Ok(())
     }
 
@@ -341,6 +342,10 @@ impl Server {
             self.peer
                 .respond(request.receipt(), proto::ShareWorktreeResponse {})
                 .await?;
+
+            let collaborator_user_ids = worktree.collaborator_user_ids.clone();
+            drop(state);
+            self.update_collaborators(&collaborator_user_ids).await?;
         } else {
             self.peer
                 .respond_with_error(
@@ -361,6 +366,7 @@ impl Server {
         let worktree_id = request.payload.worktree_id;
 
         let connection_ids;
+        let collaborator_user_ids;
         {
             let mut state = self.state.write().await;
             let worktree = state.write_worktree(worktree_id, request.sender_id)?;
@@ -369,6 +375,7 @@ impl Server {
             }
 
             connection_ids = worktree.connection_ids();
+            collaborator_user_ids = worktree.collaborator_user_ids.clone();
             worktree.share.take();
             for connection_id in &connection_ids {
                 if let Some(connection) = state.connections.get_mut(connection_id) {
@@ -382,6 +389,7 @@ impl Server {
                 .send(conn_id, proto::UnshareWorktree { worktree_id })
         })
         .await?;
+        self.update_collaborators(&collaborator_user_ids).await?;
 
         Ok(())
     }
@@ -399,6 +407,7 @@ impl Server {
 
         let response;
         let connection_ids;
+        let collaborator_user_ids;
         let mut state = self.state.write().await;
         match state.join_worktree(request.sender_id, user_id, worktree_id) {
             Ok((peer_replica_id, worktree)) => {
@@ -418,6 +427,7 @@ impl Server {
                     }
                 }
                 connection_ids = worktree.connection_ids();
+                collaborator_user_ids = worktree.collaborator_user_ids.clone();
                 response = proto::JoinWorktreeResponse {
                     worktree: Some(proto::Worktree {
                         id: worktree_id,
@@ -455,6 +465,7 @@ impl Server {
         })
         .await?;
         self.peer.respond(request.receipt(), response).await?;
+        self.update_collaborators(&collaborator_user_ids).await?;
 
         Ok(())
     }
@@ -683,14 +694,12 @@ impl Server {
         Ok(())
     }
 
-    async fn get_collaborators(
-        self: Arc<Server>,
-        request: TypedEnvelope<proto::GetCollaborators>,
-    ) -> tide::Result<()> {
-        let mut collaborators = HashMap::new();
-        {
-            let state = self.state.read().await;
-            let user_id = state.user_id_for_connection(request.sender_id)?;
+    async fn update_collaborators(self: &Arc<Server>, user_ids: &[UserId]) -> tide::Result<()> {
+        let mut send_futures = Vec::new();
+
+        let state = self.state.read().await;
+        for user_id in user_ids {
+            let mut collaborators = HashMap::new();
             for worktree_id in state
                 .visible_worktrees_by_user_id
                 .get(&user_id)
@@ -698,21 +707,11 @@ impl Server {
             {
                 let worktree = &state.worktrees[worktree_id];
 
-                let mut participants = Vec::new();
-                for collaborator_user_id in &worktree.collaborator_user_ids {
-                    collaborators
-                        .entry(*collaborator_user_id)
-                        .or_insert_with(|| proto::Collaborator {
-                            user_id: collaborator_user_id.to_proto(),
-                            worktrees: Vec::new(),
-                            is_online: state.is_online(*collaborator_user_id),
-                        });
-
-                    if let Ok(share) = worktree.share() {
-                        let mut conn_ids = state.user_connection_ids(*collaborator_user_id);
-                        if conn_ids.any(|c| share.guest_connection_ids.contains_key(&c)) {
-                            participants.push(collaborator_user_id.to_proto());
-                        }
+                let mut participants = HashSet::new();
+                if let Ok(share) = worktree.share() {
+                    for guest_connection_id in share.guest_connection_ids.keys() {
+                        let user_id = state.user_id_for_connection(*guest_connection_id)?;
+                        participants.insert(user_id.to_proto());
                     }
                 }
 
@@ -723,24 +722,34 @@ impl Server {
                         .or_insert_with(|| proto::Collaborator {
                             user_id: host_user_id.to_proto(),
                             worktrees: Vec::new(),
-                            is_online: true,
                         });
-                host.worktrees.push(proto::CollaboratorWorktree {
+                host.worktrees.push(proto::WorktreeMetadata {
                     root_name: worktree.root_name.clone(),
                     is_shared: worktree.share().is_ok(),
-                    participants,
+                    participants: participants.into_iter().collect(),
                 });
             }
+
+            let connection_ids = self
+                .state
+                .read()
+                .await
+                .user_connection_ids(*user_id)
+                .collect::<Vec<_>>();
+
+            let collaborators = collaborators.into_values().collect::<Vec<_>>();
+            for connection_id in connection_ids {
+                send_futures.push(self.peer.send(
+                    connection_id,
+                    proto::UpdateCollaborators {
+                        collaborators: collaborators.clone(),
+                    },
+                ));
+            }
         }
 
-        self.peer
-            .respond(
-                request.receipt(),
-                proto::GetCollaboratorsResponse {
-                    collaborators: collaborators.into_values().collect(),
-                },
-            )
-            .await?;
+        futures::future::try_join_all(send_futures).await?;
+
         Ok(())
     }
 

zed/src/presence.rs 🔗

@@ -20,11 +20,11 @@ pub struct Presence {
 #[derive(Debug)]
 struct Collaborator {
     user: Arc<User>,
-    worktrees: Vec<CollaboratorWorktree>,
+    worktrees: Vec<WorktreeMetadata>,
 }
 
 #[derive(Debug)]
-struct CollaboratorWorktree {
+struct WorktreeMetadata {
     root_name: String,
     is_shared: bool,
     participants: Vec<Arc<User>>,
@@ -118,7 +118,7 @@ impl Collaborator {
             for participant_id in worktree.participants {
                 participants.push(user_store.fetch_user(participant_id).await?);
             }
-            worktrees.push(CollaboratorWorktree {
+            worktrees.push(WorktreeMetadata {
                 root_name: worktree.root_name,
                 is_shared: worktree.is_shared,
                 participants,

zrpc/proto/zed.proto 🔗

@@ -38,8 +38,7 @@ message Envelope {
         OpenWorktree open_worktree = 33;
         OpenWorktreeResponse open_worktree_response = 34;
         UnshareWorktree unshare_worktree = 35;
-        GetCollaborators get_collaborators = 36;
-        GetCollaboratorsResponse get_collaborators_response = 37;
+        UpdateCollaborators update_collaborators = 36;
     }
 }
 
@@ -186,9 +185,7 @@ message GetChannelMessagesResponse {
     bool done = 2;
 }
 
-message GetCollaborators {}
-
-message GetCollaboratorsResponse {
+message UpdateCollaborators {
     repeated Collaborator collaborators = 1;
 }
 
@@ -337,11 +334,10 @@ message ChannelMessage {
 
 message Collaborator {
     uint64 user_id = 1;
-    repeated CollaboratorWorktree worktrees = 2;
-    bool is_online = 3;
+    repeated WorktreeMetadata worktrees = 2;
 }
 
-message CollaboratorWorktree {
+message WorktreeMetadata {
     string root_name = 1;
     bool is_shared = 2;
     repeated uint64 participants = 3;

zrpc/src/proto.rs 🔗

@@ -131,8 +131,7 @@ messages!(
     GetChannelMessagesResponse,
     GetChannels,
     GetChannelsResponse,
-    GetCollaborators,
-    GetCollaboratorsResponse,
+    UpdateCollaborators,
     GetUsers,
     GetUsersResponse,
     JoinChannel,
@@ -170,7 +169,6 @@ request_messages!(
     (UnshareWorktree, Ack),
     (SendChannelMessage, SendChannelMessageResponse),
     (GetChannelMessages, GetChannelMessagesResponse),
-    (GetCollaborators, GetCollaboratorsResponse),
 );
 
 entity_messages!(