Subscribe to worktree messages at the entity level

Nathan Sobo and Max Brunsfeld created

Co-Authored-By: Max Brunsfeld <maxbrunsfeld@gmail.com>

Change summary

server/src/tests.rs |   2 
zed/src/channel.rs  |  21 +--
zed/src/rpc.rs      |  33 ++++
zed/src/util.rs     |  12 +
zed/src/worktree.rs | 293 +++++++++++++++++++++++++++++++++++-----------
zrpc/src/peer.rs    |   9 
zrpc/src/proto.rs   | 168 +++++++++++++------------
7 files changed, 364 insertions(+), 174 deletions(-)

Detailed changes

server/src/tests.rs 🔗

@@ -14,7 +14,7 @@ use sqlx::{
     types::time::OffsetDateTime,
     Executor as _, Postgres,
 };
-use std::{path::Path, sync::Arc, time::SystemTime};
+use std::{path::Path, sync::Arc};
 use zed::{
     editor::Editor,
     fs::{FakeFs, Fs as _},

zed/src/channel.rs 🔗

@@ -1,5 +1,5 @@
 use crate::rpc::{self, Client};
-use futures::StreamExt;
+use anyhow::Result;
 use gpui::{Entity, ModelContext, Task, WeakModelHandle};
 use std::{
     collections::{HashMap, VecDeque},
@@ -22,7 +22,7 @@ pub struct Channel {
     first_message_id: Option<u64>,
     messages: Option<VecDeque<ChannelMessage>>,
     rpc: Arc<Client>,
-    _receive_messages: Task<()>,
+    _message_handler: Task<()>,
 }
 
 pub struct ChannelMessage {
@@ -50,28 +50,23 @@ impl Entity for Channel {
 
 impl Channel {
     pub fn new(details: ChannelDetails, rpc: Arc<Client>, cx: &mut ModelContext<Self>) -> Self {
-        let mut messages = rpc.subscribe();
-        let receive_messages = cx.spawn_weak(|this, mut cx| async move {
-            while let Some(message) = messages.next().await {
-                if let Some(this) = this.upgrade(&cx) {
-                    this.update(&mut cx, |this, cx| this.message_received(&message, cx));
-                }
-            }
-        });
+        let _message_handler = rpc.subscribe_from_model(details.id, cx, Self::handle_message_sent);
 
         Self {
             details,
             rpc,
             first_message_id: None,
             messages: None,
-            _receive_messages: receive_messages,
+            _message_handler,
         }
     }
 
-    fn message_received(
+    fn handle_message_sent(
         &mut self,
         message: &TypedEnvelope<ChannelMessageSent>,
+        rpc: rpc::Client,
         cx: &mut ModelContext<Self>,
-    ) {
+    ) -> Result<()> {
+        Ok(())
     }
 }

zed/src/rpc.rs 🔗

@@ -2,14 +2,15 @@ use crate::{language::LanguageRegistry, worktree::Worktree};
 use anyhow::{anyhow, Context, Result};
 use async_tungstenite::tungstenite::http::Request;
 use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage};
-use futures::Stream;
-use gpui::{AsyncAppContext, ModelHandle, Task, WeakModelHandle};
+use futures::StreamExt;
+use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
 use lazy_static::lazy_static;
 use smol::lock::RwLock;
 use std::collections::HashMap;
 use std::time::Duration;
 use std::{convert::TryFrom, future::Future, sync::Arc};
 use surf::Url;
+use zrpc::proto::EntityMessage;
 pub use zrpc::{proto, ConnectionId, PeerId, TypedEnvelope};
 use zrpc::{
     proto::{EnvelopedMessage, RequestMessage},
@@ -82,8 +83,32 @@ impl Client {
         });
     }
 
-    pub fn subscribe<T: EnvelopedMessage>(&self) -> impl Stream<Item = Arc<TypedEnvelope<T>>> {
-        self.peer.subscribe()
+    pub fn subscribe_from_model<T, M, F>(
+        &self,
+        remote_id: u64,
+        cx: &mut ModelContext<M>,
+        mut handler: F,
+    ) -> Task<()>
+    where
+        T: EntityMessage,
+        M: Entity,
+        F: 'static + FnMut(&mut M, &TypedEnvelope<T>, Client, &mut ModelContext<M>) -> Result<()>,
+    {
+        let rpc = self.clone();
+        let mut incoming = self.peer.subscribe::<T>();
+        cx.spawn_weak(|model, mut cx| async move {
+            while let Some(envelope) = incoming.next().await {
+                if envelope.payload.remote_entity_id() == remote_id {
+                    if let Some(model) = model.upgrade(&cx) {
+                        model.update(&mut cx, |model, cx| {
+                            if let Err(error) = handler(model, &envelope, rpc.clone(), cx) {
+                                log::error!("error handling message: {}", error)
+                            }
+                        });
+                    }
+                }
+            }
+        })
     }
 
     pub async fn log_in_and_connect(

zed/src/util.rs 🔗

@@ -1,3 +1,4 @@
+use futures::Future;
 use rand::prelude::*;
 use std::cmp::Ordering;
 
@@ -81,6 +82,17 @@ impl<T: Rng> Iterator for RandomCharIter<T> {
     }
 }
 
+pub async fn log_async_errors<F>(f: F) -> impl Future<Output = ()>
+where
+    F: Future<Output = anyhow::Result<()>>,
+{
+    async {
+        if let Err(error) = f.await {
+            log::error!("{}", error)
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;

zed/src/worktree.rs 🔗

@@ -10,7 +10,7 @@ use crate::{
     rpc::{self, proto},
     sum_tree::{self, Cursor, Edit, SumTree},
     time::{self, ReplicaId},
-    util::Bias,
+    util::{log_async_errors, Bias},
 };
 use ::ignore::gitignore::Gitignore;
 use anyhow::{anyhow, Result};
@@ -76,7 +76,10 @@ impl Entity for Worktree {
 
     fn release(&mut self, cx: &mut MutableAppContext) {
         let rpc = match self {
-            Self::Local(tree) => tree.rpc.clone(),
+            Self::Local(tree) => tree
+                .share
+                .as_ref()
+                .map(|share| (share.rpc.clone(), share.remote_id)),
             Self::Remote(tree) => Some((tree.rpc.clone(), tree.remote_id)),
         };
 
@@ -226,6 +229,14 @@ impl Worktree {
                     .detach();
                 }
 
+                let _message_handlers = vec![
+                    rpc.subscribe_from_model(remote_id, cx, Self::handle_add_peer),
+                    rpc.subscribe_from_model(remote_id, cx, Self::handle_remove_peer),
+                    rpc.subscribe_from_model(remote_id, cx, Self::handle_update),
+                    rpc.subscribe_from_model(remote_id, cx, Self::handle_update_buffer),
+                    rpc.subscribe_from_model(remote_id, cx, Self::handle_buffer_saved),
+                ];
+
                 Worktree::Remote(RemoteWorktree {
                     remote_id,
                     replica_id,
@@ -239,6 +250,7 @@ impl Worktree {
                         .map(|p| (PeerId(p.peer_id), p.replica_id as ReplicaId))
                         .collect(),
                     languages,
+                    _message_handlers,
                 })
             })
         });
@@ -289,10 +301,11 @@ impl Worktree {
         }
     }
 
-    pub fn add_peer(
+    pub fn handle_add_peer(
         &mut self,
-        envelope: TypedEnvelope<proto::AddPeer>,
-        cx: &mut ModelContext<Worktree>,
+        envelope: &TypedEnvelope<proto::AddPeer>,
+        _: rpc::Client,
+        cx: &mut ModelContext<Self>,
     ) -> Result<()> {
         match self {
             Worktree::Local(worktree) => worktree.add_peer(envelope, cx),
@@ -300,10 +313,11 @@ impl Worktree {
         }
     }
 
-    pub fn remove_peer(
+    pub fn handle_remove_peer(
         &mut self,
-        envelope: TypedEnvelope<proto::RemovePeer>,
-        cx: &mut ModelContext<Worktree>,
+        envelope: &TypedEnvelope<proto::RemovePeer>,
+        _: rpc::Client,
+        cx: &mut ModelContext<Self>,
     ) -> Result<()> {
         match self {
             Worktree::Local(worktree) => worktree.remove_peer(envelope, cx),
@@ -311,6 +325,51 @@ impl Worktree {
         }
     }
 
+    pub fn handle_update(
+        &mut self,
+        envelope: &TypedEnvelope<proto::UpdateWorktree>,
+        _: rpc::Client,
+        cx: &mut ModelContext<Self>,
+    ) -> anyhow::Result<()> {
+        self.as_remote_mut()
+            .unwrap()
+            .update_from_remote(envelope, cx)
+    }
+
+    pub fn handle_open_buffer(
+        &mut self,
+        envelope: &TypedEnvelope<proto::OpenBuffer>,
+        rpc: rpc::Client,
+        cx: &mut ModelContext<Self>,
+    ) -> anyhow::Result<()> {
+        let receipt = envelope.receipt();
+
+        let response = self
+            .as_local_mut()
+            .unwrap()
+            .open_remote_buffer(envelope, cx);
+
+        cx.background()
+            .spawn(log_async_errors(async move {
+                rpc.respond(receipt, response.await?).await?;
+                Ok(())
+            }))
+            .detach();
+
+        Ok(())
+    }
+
+    pub fn handle_close_buffer(
+        &mut self,
+        envelope: &TypedEnvelope<proto::CloseBuffer>,
+        _: rpc::Client,
+        cx: &mut ModelContext<Self>,
+    ) -> anyhow::Result<()> {
+        self.as_local_mut()
+            .unwrap()
+            .close_remote_buffer(envelope, cx)
+    }
+
     pub fn peers(&self) -> &HashMap<PeerId, ReplicaId> {
         match self {
             Worktree::Local(worktree) => &worktree.peers,
@@ -356,13 +415,15 @@ impl Worktree {
             .is_some()
     }
 
-    pub fn update_buffer(
+    pub fn handle_update_buffer(
         &mut self,
-        envelope: proto::UpdateBuffer,
+        envelope: &TypedEnvelope<proto::UpdateBuffer>,
+        _: rpc::Client,
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
-        let buffer_id = envelope.buffer_id as usize;
-        let ops = envelope
+        let payload = envelope.payload.clone();
+        let buffer_id = payload.buffer_id as usize;
+        let ops = payload
             .operations
             .into_iter()
             .map(|op| op.try_into())
@@ -401,34 +462,72 @@ impl Worktree {
         Ok(())
     }
 
-    pub fn buffer_saved(
+    pub fn handle_save_buffer(
         &mut self,
-        message: proto::BufferSaved,
+        envelope: &TypedEnvelope<proto::SaveBuffer>,
+        rpc: rpc::Client,
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
-        if let Worktree::Remote(worktree) = self {
-            if let Some(buffer) = worktree
-                .open_buffers
-                .get(&(message.buffer_id as usize))
-                .and_then(|buf| buf.upgrade(cx))
-            {
-                buffer.update(cx, |buffer, cx| {
-                    let version = message.version.try_into()?;
-                    let mtime = message
-                        .mtime
-                        .ok_or_else(|| anyhow!("missing mtime"))?
-                        .into();
-                    buffer.did_save(version, mtime, cx);
-                    Result::<_, anyhow::Error>::Ok(())
-                })?;
-            }
-            Ok(())
-        } else {
-            Err(anyhow!(
-                "invalid buffer {} in buffer saved message",
-                message.buffer_id
-            ))
+        let sender_id = envelope.original_sender_id()?;
+        let buffer = self
+            .as_local()
+            .unwrap()
+            .shared_buffers
+            .get(&sender_id)
+            .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
+            .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
+
+        let receipt = envelope.receipt();
+        let worktree_id = envelope.payload.worktree_id;
+        let buffer_id = envelope.payload.buffer_id;
+        let save = buffer.update(cx, |buffer, cx| buffer.save(cx))?;
+
+        cx.background()
+            .spawn(log_async_errors(async move {
+                let (version, mtime) = save.await?;
+
+                rpc.respond(
+                    receipt,
+                    proto::BufferSaved {
+                        worktree_id,
+                        buffer_id,
+                        version: (&version).into(),
+                        mtime: Some(mtime.into()),
+                    },
+                )
+                .await?;
+
+                Ok(())
+            }))
+            .detach();
+
+        Ok(())
+    }
+
+    pub fn handle_buffer_saved(
+        &mut self,
+        envelope: &TypedEnvelope<proto::BufferSaved>,
+        _: rpc::Client,
+        cx: &mut ModelContext<Self>,
+    ) -> Result<()> {
+        let payload = envelope.payload.clone();
+        let worktree = self.as_remote_mut().unwrap();
+        if let Some(buffer) = worktree
+            .open_buffers
+            .get(&(payload.buffer_id as usize))
+            .and_then(|buf| buf.upgrade(cx))
+        {
+            buffer.update(cx, |buffer, cx| {
+                let version = payload.version.try_into()?;
+                let mtime = payload
+                    .mtime
+                    .ok_or_else(|| anyhow!("missing mtime"))?
+                    .into();
+                buffer.did_save(version, mtime, cx);
+                Result::<_, anyhow::Error>::Ok(())
+            })?;
         }
+        Ok(())
     }
 
     fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
@@ -561,11 +660,10 @@ impl Deref for Worktree {
 pub struct LocalWorktree {
     snapshot: Snapshot,
     background_snapshot: Arc<Mutex<Snapshot>>,
-    snapshots_to_send_tx: Option<Sender<Snapshot>>,
     last_scan_state_rx: watch::Receiver<ScanState>,
     _background_scanner_task: Option<Task<()>>,
     poll_task: Option<Task<()>>,
-    rpc: Option<(rpc::Client, u64)>,
+    share: Option<ShareState>,
     open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
     shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
     peers: HashMap<PeerId, ReplicaId>,
@@ -619,14 +717,13 @@ impl LocalWorktree {
             let tree = Self {
                 snapshot: snapshot.clone(),
                 background_snapshot: Arc::new(Mutex::new(snapshot)),
-                snapshots_to_send_tx: None,
                 last_scan_state_rx,
                 _background_scanner_task: None,
+                share: None,
                 poll_task: None,
                 open_buffers: Default::default(),
                 shared_buffers: Default::default(),
                 peers: Default::default(),
-                rpc: None,
                 languages,
                 fs,
             };
@@ -639,10 +736,8 @@ impl LocalWorktree {
                             this.poll_snapshot(cx);
                             let tree = this.as_local_mut().unwrap();
                             if !tree.is_scanning() {
-                                if let Some(snapshots_to_send_tx) =
-                                    tree.snapshots_to_send_tx.clone()
-                                {
-                                    Some((tree.snapshot(), snapshots_to_send_tx))
+                                if let Some(share) = tree.share.as_ref() {
+                                    Some((tree.snapshot(), share.snapshots_tx.clone()))
                                 } else {
                                     None
                                 }
@@ -717,7 +812,7 @@ impl LocalWorktree {
 
     pub fn open_remote_buffer(
         &mut self,
-        envelope: TypedEnvelope<proto::OpenBuffer>,
+        envelope: &TypedEnvelope<proto::OpenBuffer>,
         cx: &mut ModelContext<Worktree>,
     ) -> Task<Result<proto::OpenBufferResponse>> {
         let peer_id = envelope.original_sender_id();
@@ -744,7 +839,7 @@ impl LocalWorktree {
 
     pub fn close_remote_buffer(
         &mut self,
-        envelope: TypedEnvelope<proto::CloseBuffer>,
+        envelope: &TypedEnvelope<proto::CloseBuffer>,
         _: &mut ModelContext<Worktree>,
     ) -> Result<()> {
         if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
@@ -756,19 +851,24 @@ impl LocalWorktree {
 
     pub fn add_peer(
         &mut self,
-        envelope: TypedEnvelope<proto::AddPeer>,
+        envelope: &TypedEnvelope<proto::AddPeer>,
         cx: &mut ModelContext<Worktree>,
     ) -> Result<()> {
-        let peer = envelope.payload.peer.ok_or_else(|| anyhow!("empty peer"))?;
+        let peer = envelope
+            .payload
+            .peer
+            .as_ref()
+            .ok_or_else(|| anyhow!("empty peer"))?;
         self.peers
             .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
         cx.notify();
+
         Ok(())
     }
 
     pub fn remove_peer(
         &mut self,
-        envelope: TypedEnvelope<proto::RemovePeer>,
+        envelope: &TypedEnvelope<proto::RemovePeer>,
         cx: &mut ModelContext<Worktree>,
     ) -> Result<()> {
         let peer_id = PeerId(envelope.payload.peer_id);
@@ -783,6 +883,7 @@ impl LocalWorktree {
             }
         }
         cx.notify();
+
         Ok(())
     }
 
@@ -892,6 +993,7 @@ impl LocalWorktree {
         cx.spawn(|this, mut cx| async move {
             let share_request = share_request.await;
             let share_response = rpc.request(share_request).await?;
+            let remote_id = share_response.worktree_id;
 
             rpc.state
                 .write()
@@ -906,11 +1008,10 @@ impl LocalWorktree {
             cx.background()
                 .spawn({
                     let rpc = rpc.clone();
-                    let worktree_id = share_response.worktree_id;
                     async move {
                         let mut prev_snapshot = snapshot;
                         while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
-                            let message = snapshot.build_update(&prev_snapshot, worktree_id);
+                            let message = snapshot.build_update(&prev_snapshot, remote_id);
                             match rpc.send(message).await {
                                 Ok(()) => prev_snapshot = snapshot,
                                 Err(err) => log::error!("error sending snapshot diff {}", err),
@@ -920,13 +1021,26 @@ impl LocalWorktree {
                 })
                 .detach();
 
-            this.update(&mut cx, |worktree, _| {
+            this.update(&mut cx, |worktree, cx| {
+                let _message_handlers = vec![
+                    rpc.subscribe_from_model(remote_id, cx, Worktree::handle_add_peer),
+                    rpc.subscribe_from_model(remote_id, cx, Worktree::handle_remove_peer),
+                    rpc.subscribe_from_model(remote_id, cx, Worktree::handle_open_buffer),
+                    rpc.subscribe_from_model(remote_id, cx, Worktree::handle_close_buffer),
+                    rpc.subscribe_from_model(remote_id, cx, Worktree::handle_update_buffer),
+                    rpc.subscribe_from_model(remote_id, cx, Worktree::handle_save_buffer),
+                ];
+
                 let worktree = worktree.as_local_mut().unwrap();
-                worktree.rpc = Some((rpc, share_response.worktree_id));
-                worktree.snapshots_to_send_tx = Some(snapshots_to_send_tx);
+                worktree.share = Some(ShareState {
+                    rpc,
+                    remote_id: share_response.worktree_id,
+                    snapshots_tx: snapshots_to_send_tx,
+                    _message_handlers,
+                });
             });
 
-            Ok((share_response.worktree_id, share_response.access_token))
+            Ok((remote_id, share_response.access_token))
         })
     }
 
@@ -978,6 +1092,13 @@ impl fmt::Debug for LocalWorktree {
     }
 }
 
+struct ShareState {
+    rpc: rpc::Client,
+    remote_id: u64,
+    snapshots_tx: Sender<Snapshot>,
+    _message_handlers: Vec<Task<()>>,
+}
+
 pub struct RemoteWorktree {
     remote_id: u64,
     snapshot: Snapshot,
@@ -988,6 +1109,7 @@ pub struct RemoteWorktree {
     open_buffers: HashMap<usize, RemoteBuffer>,
     peers: HashMap<PeerId, ReplicaId>,
     languages: Arc<LanguageRegistry>,
+    _message_handlers: Vec<Task<()>>,
 }
 
 impl RemoteWorktree {
@@ -1055,12 +1177,32 @@ impl RemoteWorktree {
         self.snapshot.clone()
     }
 
+    fn update_from_remote(
+        &mut self,
+        envelope: &TypedEnvelope<proto::UpdateWorktree>,
+        cx: &mut ModelContext<Worktree>,
+    ) -> Result<()> {
+        let mut tx = self.updates_tx.clone();
+        let payload = envelope.payload.clone();
+        cx.background()
+            .spawn(async move {
+                tx.send(payload).await.expect("receiver runs to completion");
+            })
+            .detach();
+
+        Ok(())
+    }
+
     pub fn add_peer(
         &mut self,
-        envelope: TypedEnvelope<proto::AddPeer>,
+        envelope: &TypedEnvelope<proto::AddPeer>,
         cx: &mut ModelContext<Worktree>,
     ) -> Result<()> {
-        let peer = envelope.payload.peer.ok_or_else(|| anyhow!("empty peer"))?;
+        let peer = envelope
+            .payload
+            .peer
+            .as_ref()
+            .ok_or_else(|| anyhow!("empty peer"))?;
         self.peers
             .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
         cx.notify();
@@ -1069,7 +1211,7 @@ impl RemoteWorktree {
 
     pub fn remove_peer(
         &mut self,
-        envelope: TypedEnvelope<proto::RemovePeer>,
+        envelope: &TypedEnvelope<proto::RemovePeer>,
         cx: &mut ModelContext<Worktree>,
     ) -> Result<()> {
         let peer_id = PeerId(envelope.payload.peer_id);
@@ -1420,7 +1562,10 @@ impl File {
     pub fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) {
         self.worktree.update(cx, |worktree, cx| {
             if let Some((rpc, remote_id)) = match worktree {
-                Worktree::Local(worktree) => worktree.rpc.clone(),
+                Worktree::Local(worktree) => worktree
+                    .share
+                    .as_ref()
+                    .map(|share| (share.rpc.clone(), share.remote_id)),
                 Worktree::Remote(worktree) => Some((worktree.rpc.clone(), worktree.remote_id)),
             } {
                 cx.spawn(|_, _| async move {
@@ -1497,9 +1642,12 @@ impl File {
     ) -> Task<Result<(time::Global, SystemTime)>> {
         self.worktree.update(cx, |worktree, cx| match worktree {
             Worktree::Local(worktree) => {
-                let rpc = worktree.rpc.clone();
+                let rpc = worktree
+                    .share
+                    .as_ref()
+                    .map(|share| (share.rpc.clone(), share.remote_id));
                 let save = worktree.save(self.path.clone(), text, cx);
-                cx.spawn(|_, _| async move {
+                cx.background().spawn(async move {
                     let entry = save.await?;
                     if let Some((rpc, worktree_id)) = rpc {
                         rpc.send(proto::BufferSaved {
@@ -1516,7 +1664,7 @@ impl File {
             Worktree::Remote(worktree) => {
                 let rpc = worktree.rpc.clone();
                 let worktree_id = worktree.remote_id;
-                cx.spawn(|_, _| async move {
+                cx.background().spawn(async move {
                     let response = rpc
                         .request(proto::SaveBuffer {
                             worktree_id,
@@ -2395,7 +2543,9 @@ mod remote {
             .read()
             .await
             .shared_worktree(envelope.payload.worktree_id, cx)?
-            .update(cx, |worktree, cx| worktree.add_peer(envelope, cx))
+            .update(cx, |worktree, cx| {
+                worktree.handle_add_peer(&envelope, rpc.clone(), cx)
+            })
     }
 
     pub async fn remove_peer(
@@ -2407,7 +2557,9 @@ mod remote {
             .read()
             .await
             .shared_worktree(envelope.payload.worktree_id, cx)?
-            .update(cx, |worktree, cx| worktree.remove_peer(envelope, cx))
+            .update(cx, |worktree, cx| {
+                worktree.handle_remove_peer(&envelope, rpc.clone(), cx)
+            })
     }
 
     pub async fn update_worktree(
@@ -2456,7 +2608,7 @@ mod remote {
                 worktree
                     .as_local_mut()
                     .unwrap()
-                    .open_remote_buffer(envelope, cx)
+                    .open_remote_buffer(&envelope, cx)
             })
             .await?;
 
@@ -2480,7 +2632,7 @@ mod remote {
             worktree
                 .as_local_mut()
                 .unwrap()
-                .close_remote_buffer(envelope, cx)
+                .close_remote_buffer(&envelope, cx)
         })
     }
 
@@ -2489,12 +2641,13 @@ mod remote {
         rpc: &rpc::Client,
         cx: &mut AsyncAppContext,
     ) -> anyhow::Result<()> {
-        let message = envelope.payload;
         rpc.state
             .read()
             .await
-            .shared_worktree(message.worktree_id, cx)?
-            .update(cx, |tree, cx| tree.update_buffer(message, cx))?;
+            .shared_worktree(envelope.payload.worktree_id, cx)?
+            .update(cx, |tree, cx| {
+                tree.handle_update_buffer(&envelope, rpc.clone(), cx)
+            })?;
         Ok(())
     }
 
@@ -2538,7 +2691,7 @@ mod remote {
             .await
             .shared_worktree(envelope.payload.worktree_id, cx)?
             .update(cx, |worktree, cx| {
-                worktree.buffer_saved(envelope.payload, cx)
+                worktree.handle_buffer_saved(&envelope, rpc.clone(), cx)
             })?;
         Ok(())
     }

zrpc/src/peer.rs 🔗

@@ -171,11 +171,10 @@ impl Peer {
                     }
                 } else {
                     router.handle(connection_id, envelope.clone()).await;
-                    match proto::build_typed_envelope(connection_id, envelope) {
-                        Ok(envelope) => {
-                            broadcast_incoming_messages.send(envelope).await.ok();
-                        }
-                        Err(error) => log::error!("{}", error),
+                    if let Some(envelope) = proto::build_typed_envelope(connection_id, envelope) {
+                        broadcast_incoming_messages.send(envelope).await.ok();
+                    } else {
+                        log::error!("unable to construct a typed envelope");
                     }
                 }
             }

zrpc/src/proto.rs 🔗

@@ -1,5 +1,5 @@
 use super::{ConnectionId, PeerId, TypedEnvelope};
-use anyhow::{anyhow, Result};
+use anyhow::Result;
 use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage};
 use futures::{SinkExt as _, StreamExt as _};
 use prost::Message;
@@ -24,127 +24,133 @@ pub trait EnvelopedMessage: Clone + Sized + Send + Sync + 'static {
     fn from_envelope(envelope: Envelope) -> Option<Self>;
 }
 
+pub trait EntityMessage: EnvelopedMessage {
+    fn remote_entity_id(&self) -> u64;
+}
+
 pub trait RequestMessage: EnvelopedMessage {
     type Response: EnvelopedMessage;
 }
 
 macro_rules! messages {
-    ($($name:ident),*) => {
-        fn unicast_message_into_typed_envelope(sender_id: ConnectionId, envelope: &mut Envelope) -> Option<Arc<dyn Any + Send + Sync>> {
-            match &mut envelope.payload {
-                $(payload @ Some(envelope::Payload::$name(_)) => Some(Arc::new(TypedEnvelope {
+    ($($name:ident),* $(,)?) => {
+        pub fn build_typed_envelope(sender_id: ConnectionId, envelope: Envelope) -> Option<Arc<dyn Any + Send + Sync>> {
+            match envelope.payload {
+                $(Some(envelope::Payload::$name(payload)) => Some(Arc::new(TypedEnvelope {
                     sender_id,
                     original_sender_id: envelope.original_sender_id.map(PeerId),
                     message_id: envelope.id,
-                    payload: payload.take().unwrap(),
+                    payload,
                 })), )*
                 _ => None
             }
         }
 
         $(
-            message!($name);
+            impl EnvelopedMessage for $name {
+                const NAME: &'static str = std::stringify!($name);
+
+                fn into_envelope(
+                    self,
+                    id: u32,
+                    responding_to: Option<u32>,
+                    original_sender_id: Option<u32>,
+                ) -> Envelope {
+                    Envelope {
+                        id,
+                        responding_to,
+                        original_sender_id,
+                        payload: Some(envelope::Payload::$name(self)),
+                    }
+                }
+
+                fn matches_envelope(envelope: &Envelope) -> bool {
+                    matches!(&envelope.payload, Some(envelope::Payload::$name(_)))
+                }
+
+                fn from_envelope(envelope: Envelope) -> Option<Self> {
+                    if let Some(envelope::Payload::$name(msg)) = envelope.payload {
+                        Some(msg)
+                    } else {
+                        None
+                    }
+                }
+            }
         )*
     };
 }
 
 macro_rules! request_messages {
-    ($(($request_name:ident, $response_name:ident)),*) => {
-        fn request_message_into_typed_envelope(sender_id: ConnectionId, envelope: Envelope) -> Option<Arc<dyn Any + Send + Sync>> {
-            match envelope.payload {
-                $(
-                    Some(envelope::Payload::$request_name(payload)) => Some(Arc::new(TypedEnvelope {
-                        sender_id,
-                        original_sender_id: envelope.original_sender_id.map(PeerId),
-                        message_id: envelope.id,
-                        payload,
-                    })),
-                    Some(envelope::Payload::$response_name(payload)) => Some(Arc::new(TypedEnvelope {
-                        sender_id,
-                        original_sender_id: envelope.original_sender_id.map(PeerId),
-                        message_id: envelope.id,
-                        payload,
-                    })),
-                )*
-                _ => None
-            }
-        }
-
-        $(
-            message!($request_name);
-            message!($response_name);
-        )*
-
+    ($(($request_name:ident, $response_name:ident)),* $(,)?) => {
         $(impl RequestMessage for $request_name {
             type Response = $response_name;
         })*
     };
 }
 
-macro_rules! message {
-    ($name:ident) => {
-        impl EnvelopedMessage for $name {
-            const NAME: &'static str = std::stringify!($name);
-
-            fn into_envelope(
-                self,
-                id: u32,
-                responding_to: Option<u32>,
-                original_sender_id: Option<u32>,
-            ) -> Envelope {
-                Envelope {
-                    id,
-                    responding_to,
-                    original_sender_id,
-                    payload: Some(envelope::Payload::$name(self)),
-                }
-            }
-
-            fn matches_envelope(envelope: &Envelope) -> bool {
-                matches!(&envelope.payload, Some(envelope::Payload::$name(_)))
-            }
-
-            fn from_envelope(envelope: Envelope) -> Option<Self> {
-                if let Some(envelope::Payload::$name(msg)) = envelope.payload {
-                    Some(msg)
-                } else {
-                    None
-                }
+macro_rules! entity_messages {
+    ($id_field:ident, $($name:ident),* $(,)?) => {
+        $(impl EntityMessage for $name {
+            fn remote_entity_id(&self) -> u64 {
+                self.$id_field
             }
-        }
+        })*
     };
 }
 
 messages!(
-    UpdateWorktree,
-    CloseWorktree,
-    CloseBuffer,
-    UpdateBuffer,
     AddPeer,
+    Auth,
+    AuthResponse,
+    BufferSaved,
+    ChannelMessageSent,
+    CloseBuffer,
+    CloseWorktree,
+    GetChannels,
+    GetChannelsResponse,
+    GetUsers,
+    GetUsersResponse,
+    JoinChannel,
+    JoinChannelResponse,
+    OpenBuffer,
+    OpenBufferResponse,
+    OpenWorktree,
+    OpenWorktreeResponse,
     RemovePeer,
+    SaveBuffer,
     SendChannelMessage,
-    ChannelMessageSent
+    ShareWorktree,
+    ShareWorktreeResponse,
+    UpdateBuffer,
+    UpdateWorktree,
 );
 
 request_messages!(
     (Auth, AuthResponse),
-    (ShareWorktree, ShareWorktreeResponse),
-    (OpenWorktree, OpenWorktreeResponse),
-    (OpenBuffer, OpenBufferResponse),
-    (SaveBuffer, BufferSaved),
     (GetChannels, GetChannelsResponse),
+    (GetUsers, GetUsersResponse),
     (JoinChannel, JoinChannelResponse),
-    (GetUsers, GetUsersResponse)
+    (OpenBuffer, OpenBufferResponse),
+    (OpenWorktree, OpenWorktreeResponse),
+    (SaveBuffer, BufferSaved),
+    (ShareWorktree, ShareWorktreeResponse),
 );
 
-pub fn build_typed_envelope(
-    sender_id: ConnectionId,
-    mut envelope: Envelope,
-) -> Result<Arc<dyn Any + Send + Sync>> {
-    unicast_message_into_typed_envelope(sender_id, &mut envelope)
-        .or_else(|| request_message_into_typed_envelope(sender_id, envelope))
-        .ok_or_else(|| anyhow!("unrecognized payload type"))
-}
+entity_messages!(
+    worktree_id,
+    AddPeer,
+    BufferSaved,
+    CloseBuffer,
+    CloseWorktree,
+    OpenBuffer,
+    OpenWorktree,
+    RemovePeer,
+    SaveBuffer,
+    UpdateBuffer,
+    UpdateWorktree,
+);
+
+entity_messages!(channel_id, ChannelMessageSent);
 
 /// A stream of protobuf messages.
 pub struct MessageStream<S> {