Make `UpdateBuffer` a request, store unsent operations on worktree

Max Brunsfeld created

Change summary

server/src/rpc.rs   |  4 +++-
zed/src/worktree.rs | 19 +++++++++++++++----
zrpc/src/proto.rs   |  1 +
3 files changed, 19 insertions(+), 5 deletions(-)

Detailed changes

server/src/rpc.rs 🔗

@@ -499,7 +499,9 @@ impl Server {
         request: TypedEnvelope<proto::UpdateBuffer>,
     ) -> tide::Result<()> {
         self.broadcast_in_worktree(request.payload.worktree_id, &request)
-            .await
+            .await?;
+        self.peer.respond(request.receipt(), proto::Ack {}).await?;
+        Ok(())
     }
 
     async fn buffer_saved(

zed/src/worktree.rs 🔗

@@ -234,6 +234,7 @@ impl Worktree {
                         .into_iter()
                         .map(|p| (PeerId(p.peer_id), p.replica_id as ReplicaId))
                         .collect(),
+                    queued_operations: Default::default(),
                     languages,
                     _subscriptions,
                 })
@@ -656,6 +657,7 @@ pub struct LocalWorktree {
     shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
     peers: HashMap<PeerId, ReplicaId>,
     languages: Arc<LanguageRegistry>,
+    queued_operations: Vec<(u64, Operation)>,
     fs: Arc<dyn Fs>,
 }
 
@@ -711,6 +713,7 @@ impl LocalWorktree {
                 poll_task: None,
                 open_buffers: Default::default(),
                 shared_buffers: Default::default(),
+                queued_operations: Default::default(),
                 peers: Default::default(),
                 languages,
                 fs,
@@ -1091,6 +1094,7 @@ pub struct RemoteWorktree {
     open_buffers: HashMap<usize, RemoteBuffer>,
     peers: HashMap<PeerId, ReplicaId>,
     languages: Arc<LanguageRegistry>,
+    queued_operations: Vec<(u64, Operation)>,
     _subscriptions: Vec<rpc::Subscription>,
 }
 
@@ -1550,16 +1554,23 @@ impl File {
                     .map(|share| (share.rpc.clone(), share.remote_id)),
                 Worktree::Remote(worktree) => Some((worktree.rpc.clone(), worktree.remote_id)),
             } {
-                cx.spawn(|_, _| async move {
+                cx.spawn(|worktree, mut cx| async move {
                     if let Err(error) = rpc
-                        .send(proto::UpdateBuffer {
+                        .request(proto::UpdateBuffer {
                             worktree_id: remote_id,
                             buffer_id,
-                            operations: Some(operation).iter().map(Into::into).collect(),
+                            operations: vec![(&operation).into()],
                         })
                         .await
                     {
-                        log::error!("error sending buffer operation: {}", error);
+                        worktree.update(&mut cx, |worktree, _| {
+                            log::error!("error sending buffer operation: {}", error);
+                            match worktree {
+                                Worktree::Local(t) => &mut t.queued_operations,
+                                Worktree::Remote(t) => &mut t.queued_operations,
+                            }
+                            .push((buffer_id, operation));
+                        });
                     }
                 })
                 .detach();

zrpc/src/proto.rs 🔗

@@ -159,6 +159,7 @@ request_messages!(
     (OpenWorktree, OpenWorktreeResponse),
     (Ping, Ack),
     (SaveBuffer, BufferSaved),
+    (UpdateBuffer, Ack),
     (ShareWorktree, ShareWorktreeResponse),
     (SendChannelMessage, SendChannelMessageResponse),
     (GetChannelMessages, GetChannelMessagesResponse),