Wait for host to acknowledge buffer updates before sending them to other guests

Max Brunsfeld created

Change summary

crates/collab/src/rpc.rs      | 33 ++++++++++++++++++++++++++++-----
crates/project/src/project.rs |  6 +++---
crates/rpc/src/rpc.rs         |  2 +-
3 files changed, 32 insertions(+), 9 deletions(-)

Detailed changes

crates/collab/src/rpc.rs 🔗

@@ -1655,17 +1655,40 @@ async fn update_buffer(
 ) -> Result<()> {
     session.executor.record_backtrace();
     let project_id = ProjectId::from_proto(request.project_id);
-    let project_connection_ids = session
+    let host_connection_id = {
+        let collaborators = session
+            .db()
+            .await
+            .project_collaborators(project_id, session.connection_id)
+            .await?;
+
+        let host = collaborators
+            .iter()
+            .find(|collaborator| collaborator.is_host)
+            .ok_or_else(|| anyhow!("host not found"))?;
+        host.connection_id
+    };
+
+    if host_connection_id != session.connection_id {
+        session
+            .peer
+            .forward_request(session.connection_id, host_connection_id, request.clone())
+            .await?;
+    }
+
+    session.executor.record_backtrace();
+    let collaborators = session
         .db()
         .await
-        .project_connection_ids(project_id, session.connection_id)
+        .project_collaborators(project_id, session.connection_id)
         .await?;
 
-    session.executor.record_backtrace();
-
     broadcast(
         Some(session.connection_id),
-        project_connection_ids.iter().copied(),
+        collaborators
+            .iter()
+            .filter(|collaborator| !collaborator.is_host)
+            .map(|collaborator| collaborator.connection_id),
         |connection_id| {
             session
                 .peer

crates/project/src/project.rs 🔗

@@ -380,7 +380,7 @@ impl Project {
         client.add_model_message_handler(Self::handle_unshare_project);
         client.add_model_message_handler(Self::handle_create_buffer_for_peer);
         client.add_model_message_handler(Self::handle_update_buffer_file);
-        client.add_model_message_handler(Self::handle_update_buffer);
+        client.add_model_request_handler(Self::handle_update_buffer);
         client.add_model_message_handler(Self::handle_update_diagnostic_summary);
         client.add_model_message_handler(Self::handle_update_worktree);
         client.add_model_request_handler(Self::handle_create_project_entry);
@@ -5160,7 +5160,7 @@ impl Project {
         envelope: TypedEnvelope<proto::UpdateBuffer>,
         _: Arc<Client>,
         mut cx: AsyncAppContext,
-    ) -> Result<()> {
+    ) -> Result<proto::Ack> {
         this.update(&mut cx, |this, cx| {
             let payload = envelope.payload.clone();
             let buffer_id = payload.buffer_id;
@@ -5187,7 +5187,7 @@ impl Project {
                     e.insert(OpenBuffer::Operations(ops));
                 }
             }
-            Ok(())
+            Ok(proto::Ack {})
         })
     }
 

crates/rpc/src/rpc.rs 🔗

@@ -6,4 +6,4 @@ pub use conn::Connection;
 pub use peer::*;
 mod macros;
 
-pub const PROTOCOL_VERSION: u32 = 50;
+pub const PROTOCOL_VERSION: u32 = 51;