Maintain a set of peers as they join and leave the worktree

Antonio Scandurra created

Change summary

zed-rpc/proto/zed.proto  | 12 +++--
zed-rpc/src/proto.rs     |  1 
zed/src/editor/buffer.rs |  2 
zed/src/worktree.rs      | 96 ++++++++++++++++++++++++++++++++++++++---
4 files changed, 98 insertions(+), 13 deletions(-)

Detailed changes

zed-rpc/proto/zed.proto 🔗

@@ -16,7 +16,8 @@ message Envelope {
         OpenBufferResponse open_buffer_response = 11;
         CloseBuffer close_buffer = 12;
         UpdateBuffer update_buffer = 13;
-        RemoveGuest remove_guest = 14;
+        AddGuest add_guest = 14;
+        RemoveGuest remove_guest = 15;
     }
 }
 
@@ -45,19 +46,20 @@ message OpenWorktree {
 
 message OpenWorktreeResponse {
     Worktree worktree = 1;
-    optional uint32 replica_id = 2;
+    uint32 replica_id = 2;
+    uint32 host_peer_id = 3;
 }
 
 message AddGuest {
     uint64 worktree_id = 1;
-    uint32 replica_id = 2;
-    User user = 3;
+    uint32 peer_id = 2;
+    uint32 replica_id = 3;
+    User user = 4;
 }
 
 message RemoveGuest {
     uint64 worktree_id = 1;
     uint32 peer_id = 2;
-    uint32 replica_id = 3;
 }
 
 message OpenBuffer {

zed-rpc/src/proto.rs 🔗

@@ -74,6 +74,7 @@ request_message!(OpenWorktree, OpenWorktreeResponse);
 request_message!(OpenBuffer, OpenBufferResponse);
 message!(CloseBuffer);
 message!(UpdateBuffer);
+message!(AddGuest);
 message!(RemoveGuest);
 
 /// A stream of protobuf messages.

zed/src/editor/buffer.rs 🔗

@@ -1494,7 +1494,7 @@ impl Buffer {
         self.operations.push(operation);
     }
 
-    pub fn peer_left(&mut self, replica_id: ReplicaId, cx: &mut ModelContext<Self>) {
+    pub fn remove_guest(&mut self, replica_id: ReplicaId, cx: &mut ModelContext<Self>) {
         self.selections
             .retain(|set_id, _| set_id.replica_id != replica_id);
         cx.notify();

zed/src/worktree.rs 🔗

@@ -37,6 +37,7 @@ use std::{
     fmt, fs,
     future::Future,
     io,
+    iter::FromIterator,
     ops::Deref,
     os::unix::fs::MetadataExt,
     path::{Path, PathBuf},
@@ -53,6 +54,7 @@ lazy_static! {
 }
 
 pub fn init(cx: &mut MutableAppContext, rpc: rpc::Client) {
+    rpc.on_message(remote::add_guest, cx);
     rpc.on_message(remote::remove_guest, cx);
     rpc.on_message(remote::open_buffer, cx);
     rpc.on_message(remote::close_buffer, cx);
@@ -100,16 +102,16 @@ impl Worktree {
         let worktree_message = open_worktree_response
             .worktree
             .ok_or_else(|| anyhow!("empty worktree"))?;
-        let replica_id = open_worktree_response
-            .replica_id
-            .ok_or_else(|| anyhow!("empty replica id"))?;
+        let replica_id = open_worktree_response.replica_id;
+        let host_peer_id = PeerId(open_worktree_response.host_peer_id);
         let worktree = cx.update(|cx| {
             cx.add_model(|cx| {
                 Worktree::Remote(RemoteWorktree::new(
                     id,
+                    replica_id as ReplicaId,
+                    host_peer_id,
                     worktree_message,
                     rpc.clone(),
-                    replica_id as ReplicaId,
                     languages,
                     cx,
                 ))
@@ -154,6 +156,17 @@ impl Worktree {
         }
     }
 
+    pub fn add_guest(
+        &mut self,
+        envelope: TypedEnvelope<proto::AddGuest>,
+        cx: &mut ModelContext<Worktree>,
+    ) -> Result<()> {
+        match self {
+            Worktree::Local(worktree) => worktree.add_guest(envelope, cx),
+            Worktree::Remote(worktree) => worktree.add_guest(envelope, cx),
+        }
+    }
+
     pub fn remove_guest(
         &mut self,
         envelope: TypedEnvelope<proto::RemoveGuest>,
@@ -161,7 +174,7 @@ impl Worktree {
     ) -> Result<()> {
         match self {
             Worktree::Local(worktree) => worktree.remove_guest(envelope, cx),
-            Worktree::Remote(_) => todo!(),
+            Worktree::Remote(worktree) => worktree.remove_guest(envelope, cx),
         }
     }
 
@@ -261,6 +274,7 @@ pub struct LocalWorktree {
     rpc: Option<(rpc::Client, u64)>,
     open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
     shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
+    peers: HashMap<PeerId, ReplicaId>,
     languages: Arc<LanguageRegistry>,
 }
 
@@ -298,6 +312,7 @@ impl LocalWorktree {
             poll_scheduled: false,
             open_buffers: Default::default(),
             shared_buffers: Default::default(),
+            peers: Default::default(),
             rpc: None,
             languages,
         };
@@ -422,12 +437,34 @@ impl LocalWorktree {
         Ok(())
     }
 
+    pub fn add_guest(
+        &mut self,
+        envelope: TypedEnvelope<proto::AddGuest>,
+        cx: &mut ModelContext<Worktree>,
+    ) -> Result<()> {
+        self.peers.insert(
+            PeerId(envelope.payload.peer_id),
+            envelope.payload.replica_id as ReplicaId,
+        );
+        Ok(())
+    }
+
     pub fn remove_guest(
         &mut self,
         envelope: TypedEnvelope<proto::RemoveGuest>,
         cx: &mut ModelContext<Worktree>,
     ) -> Result<()> {
-        self.shared_buffers.remove(&envelope.original_sender_id()?);
+        let peer_id = PeerId(envelope.payload.peer_id);
+        let replica_id = self
+            .peers
+            .remove(&peer_id)
+            .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
+        self.shared_buffers.remove(&peer_id);
+        for (_, buffer) in &self.open_buffers {
+            if let Some(buffer) = buffer.upgrade(&cx) {
+                buffer.update(cx, |buffer, cx| buffer.remove_guest(replica_id, cx));
+            }
+        }
         Ok(())
     }
 
@@ -715,15 +752,17 @@ pub struct RemoteWorktree {
     rpc: rpc::Client,
     replica_id: ReplicaId,
     open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
+    peers: HashMap<PeerId, ReplicaId>,
     languages: Arc<LanguageRegistry>,
 }
 
 impl RemoteWorktree {
     fn new(
         remote_id: u64,
+        replica_id: ReplicaId,
+        host_peer_id: PeerId,
         worktree: proto::Worktree,
         rpc: rpc::Client,
-        replica_id: ReplicaId,
         languages: Arc<LanguageRegistry>,
         cx: &mut ModelContext<Worktree>,
     ) -> Self {
@@ -779,6 +818,7 @@ impl RemoteWorktree {
             rpc,
             replica_id,
             open_buffers: Default::default(),
+            peers: HashMap::from_iter(Some((host_peer_id, 0))),
             languages,
         }
     }
@@ -837,6 +877,36 @@ impl RemoteWorktree {
             }
         })
     }
+
+    pub fn add_guest(
+        &mut self,
+        envelope: TypedEnvelope<proto::AddGuest>,
+        cx: &mut ModelContext<Worktree>,
+    ) -> Result<()> {
+        self.peers.insert(
+            PeerId(envelope.payload.peer_id),
+            envelope.payload.replica_id as ReplicaId,
+        );
+        Ok(())
+    }
+
+    pub fn remove_guest(
+        &mut self,
+        envelope: TypedEnvelope<proto::RemoveGuest>,
+        cx: &mut ModelContext<Worktree>,
+    ) -> Result<()> {
+        let peer_id = PeerId(envelope.payload.peer_id);
+        let replica_id = self
+            .peers
+            .remove(&peer_id)
+            .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
+        for (_, buffer) in &self.open_buffers {
+            if let Some(buffer) = buffer.upgrade(&cx) {
+                buffer.update(cx, |buffer, cx| buffer.remove_guest(replica_id, cx));
+            }
+        }
+        Ok(())
+    }
 }
 
 #[derive(Clone)]
@@ -1938,6 +2008,18 @@ impl<'a> Iterator for ChildEntriesIter<'a> {
 mod remote {
     use super::*;
 
+    pub async fn add_guest(
+        envelope: TypedEnvelope<proto::AddGuest>,
+        rpc: &rpc::Client,
+        cx: &mut AsyncAppContext,
+    ) -> anyhow::Result<()> {
+        rpc.state
+            .lock()
+            .await
+            .shared_worktree(envelope.payload.worktree_id, cx)?
+            .update(cx, |worktree, cx| worktree.add_guest(envelope, cx))
+    }
+
     pub async fn remove_guest(
         envelope: TypedEnvelope<proto::RemoveGuest>,
         rpc: &rpc::Client,