Wait for previous `UpdateFollowers` message ack before sending new ones

Antonio Scandurra created

Change summary

crates/collab/src/integration_tests.rs | 82 ++++++++++++++++++---------
crates/collab/src/rpc.rs               |  4 +
crates/rpc/src/proto.rs                |  1 
crates/workspace/src/workspace.rs      | 76 ++++++++++++++++---------
4 files changed, 106 insertions(+), 57 deletions(-)

Detailed changes

crates/collab/src/integration_tests.rs 🔗

@@ -4672,7 +4672,7 @@ async fn test_following(
     cx_a: &mut TestAppContext,
     cx_b: &mut TestAppContext,
 ) {
-    cx_a.foreground().forbid_parking();
+    deterministic.forbid_parking();
     cx_a.update(editor::init);
     cx_b.update(editor::init);
 
@@ -4791,11 +4791,14 @@ async fn test_following(
     workspace_a.update(cx_a, |workspace, cx| {
         workspace.activate_item(&editor_a1, cx)
     });
-    workspace_b
-        .condition(cx_b, |workspace, cx| {
-            workspace.active_item(cx).unwrap().id() == editor_b1.id()
-        })
-        .await;
+    deterministic.run_until_parked();
+    assert_eq!(
+        workspace_b.read_with(cx_b, |workspace, cx| workspace
+            .active_item(cx)
+            .unwrap()
+            .id()),
+        editor_b1.id()
+    );
 
     // When client A navigates back and forth, client B does so as well.
     workspace_a
@@ -4803,49 +4806,74 @@ async fn test_following(
             workspace::Pane::go_back(workspace, None, cx)
         })
         .await;
-    workspace_b
-        .condition(cx_b, |workspace, cx| {
-            workspace.active_item(cx).unwrap().id() == editor_b2.id()
-        })
-        .await;
+    deterministic.run_until_parked();
+    assert_eq!(
+        workspace_b.read_with(cx_b, |workspace, cx| workspace
+            .active_item(cx)
+            .unwrap()
+            .id()),
+        editor_b2.id()
+    );
 
     workspace_a
         .update(cx_a, |workspace, cx| {
             workspace::Pane::go_forward(workspace, None, cx)
         })
         .await;
-    workspace_b
-        .condition(cx_b, |workspace, cx| {
-            workspace.active_item(cx).unwrap().id() == editor_b1.id()
+    workspace_a
+        .update(cx_a, |workspace, cx| {
+            workspace::Pane::go_back(workspace, None, cx)
+        })
+        .await;
+    workspace_a
+        .update(cx_a, |workspace, cx| {
+            workspace::Pane::go_forward(workspace, None, cx)
         })
         .await;
+    deterministic.run_until_parked();
+    assert_eq!(
+        workspace_b.read_with(cx_b, |workspace, cx| workspace
+            .active_item(cx)
+            .unwrap()
+            .id()),
+        editor_b1.id()
+    );
 
     // Changes to client A's editor are reflected on client B.
     editor_a1.update(cx_a, |editor, cx| {
         editor.change_selections(None, cx, |s| s.select_ranges([1..1, 2..2]));
     });
-    editor_b1
-        .condition(cx_b, |editor, cx| {
-            editor.selections.ranges(cx) == vec![1..1, 2..2]
-        })
-        .await;
+    deterministic.run_until_parked();
+    assert_eq!(
+        editor_b1.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
+        vec![1..1, 2..2]
+    );
 
     editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
-    editor_b1
-        .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
-        .await;
+    deterministic.run_until_parked();
+    assert_eq!(
+        editor_b1.read_with(cx_b, |editor, cx| editor.text(cx)),
+        "TWO"
+    );
 
     editor_a1.update(cx_a, |editor, cx| {
         editor.change_selections(None, cx, |s| s.select_ranges([3..3]));
         editor.set_scroll_position(vec2f(0., 100.), cx);
     });
-    editor_b1
-        .condition(cx_b, |editor, cx| {
-            editor.selections.ranges(cx) == vec![3..3]
-        })
-        .await;
+    deterministic.run_until_parked();
+    assert_eq!(
+        editor_b1.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
+        vec![3..3]
+    );
 
     // After unfollowing, client B stops receiving updates from client A.
+    assert_eq!(
+        workspace_b.read_with(cx_b, |workspace, cx| workspace
+            .active_item(cx)
+            .unwrap()
+            .id()),
+        editor_b1.id()
+    );
     workspace_b.update(cx_b, |workspace, cx| {
         workspace.unfollow(&workspace.active_pane().clone(), cx)
     });

crates/collab/src/rpc.rs 🔗

@@ -192,7 +192,7 @@ impl Server {
             .add_request_handler(Server::respond_to_contact_request)
             .add_request_handler(Server::follow)
             .add_message_handler(Server::unfollow)
-            .add_message_handler(Server::update_followers)
+            .add_request_handler(Server::update_followers)
             .add_message_handler(Server::update_diff_base)
             .add_request_handler(Server::get_private_user_info);
 
@@ -1437,6 +1437,7 @@ impl Server {
     async fn update_followers(
         self: Arc<Self>,
         request: Message<proto::UpdateFollowers>,
+        response: Response<proto::UpdateFollowers>,
     ) -> Result<()> {
         let project_id = ProjectId::from_proto(request.payload.project_id);
         let project_connection_ids = self
@@ -1464,6 +1465,7 @@ impl Server {
                 )?;
             }
         }
+        response.send(proto::Ack {})?;
         Ok(())
     }
 

crates/rpc/src/proto.rs 🔗

@@ -229,6 +229,7 @@ request_messages!(
     (Test, Test),
     (UpdateBuffer, Ack),
     (UpdateDiagnosticSummary, Ack),
+    (UpdateFollowers, Ack),
     (UpdateParticipantLocation, Ack),
     (UpdateProject, Ack),
     (UpdateWorktree, Ack),

crates/workspace/src/workspace.rs 🔗

@@ -18,7 +18,10 @@ use collections::{hash_map, HashMap, HashSet};
 use dock::{DefaultItemFactory, Dock, ToggleDockButton};
 use drag_and_drop::DragAndDrop;
 use fs::{self, Fs};
-use futures::{channel::oneshot, FutureExt, StreamExt};
+use futures::{
+    channel::{mpsc, oneshot},
+    FutureExt, StreamExt,
+};
 use gpui::{
     actions,
     elements::*,
@@ -711,14 +714,13 @@ impl<T: Item> ItemHandle for ViewHandle<T> {
 
         if let Some(followed_item) = self.to_followable_item_handle(cx) {
             if let Some(message) = followed_item.to_state_proto(cx) {
-                workspace.update_followers(
-                    proto::update_followers::Variant::CreateView(proto::View {
+                workspace.update_followers(proto::update_followers::Variant::CreateView(
+                    proto::View {
                         id: followed_item.id() as u64,
                         variant: Some(message),
                         leader_id: workspace.leader_for_pane(&pane).map(|id| id.0),
-                    }),
-                    cx,
-                );
+                    },
+                ));
             }
         }
 
@@ -762,7 +764,7 @@ impl<T: Item> ItemHandle for ViewHandle<T> {
                             cx.after_window_update({
                                 let pending_update = pending_update.clone();
                                 let pending_update_scheduled = pending_update_scheduled.clone();
-                                move |this, cx| {
+                                move |this, _| {
                                     pending_update_scheduled.store(false, SeqCst);
                                     this.update_followers(
                                         proto::update_followers::Variant::UpdateView(
@@ -772,7 +774,6 @@ impl<T: Item> ItemHandle for ViewHandle<T> {
                                                 leader_id: leader_id.map(|id| id.0),
                                             },
                                         ),
-                                        cx,
                                     );
                                 }
                             });
@@ -1081,9 +1082,11 @@ pub struct Workspace {
     leader_state: LeaderState,
     follower_states_by_leader: FollowerStatesByLeader,
     last_leaders_by_pane: HashMap<WeakViewHandle<Pane>, PeerId>,
+    follower_updates: mpsc::UnboundedSender<proto::update_followers::Variant>,
     window_edited: bool,
     active_call: Option<(ModelHandle<ActiveCall>, Vec<gpui::Subscription>)>,
     _observe_current_user: Task<()>,
+    _update_followers: Task<Option<()>>,
 }
 
 #[derive(Default)]
@@ -1166,6 +1169,34 @@ impl Workspace {
             }
         });
 
+        let (follower_updates_tx, mut follower_updates_rx) = mpsc::unbounded();
+        let _update_followers = cx.spawn_weak(|this, cx| async move {
+            while let Some(update) = follower_updates_rx.next().await {
+                let this = this.upgrade(&cx)?;
+                let update_followers = this.read_with(&cx, |this, cx| {
+                    if let Some(project_id) = this.project.read(cx).remote_id() {
+                        if this.leader_state.followers.is_empty() {
+                            None
+                        } else {
+                            Some(this.client.request(proto::UpdateFollowers {
+                                project_id,
+                                follower_ids:
+                                    this.leader_state.followers.iter().map(|f| f.0).collect(),
+                                variant: Some(update),
+                            }))
+                        }
+                    } else {
+                        None
+                    }
+                });
+
+                if let Some(update_followers) = update_followers {
+                    update_followers.await.log_err();
+                }
+            }
+            None
+        });
+
         let handle = cx.handle();
         let weak_handle = cx.weak_handle();
 
@@ -1224,10 +1255,12 @@ impl Workspace {
             project,
             leader_state: Default::default(),
             follower_states_by_leader: Default::default(),
+            follower_updates: follower_updates_tx,
             last_leaders_by_pane: Default::default(),
             window_edited: false,
             active_call,
             _observe_current_user,
+            _update_followers,
         };
         this.project_remote_id_changed(this.project.read(cx).remote_id(), cx);
         cx.defer(|this, cx| this.update_window_title(cx));
@@ -1967,13 +2000,12 @@ impl Workspace {
             cx.notify();
         }
 
-        self.update_followers(
-            proto::update_followers::Variant::UpdateActiveView(proto::UpdateActiveView {
+        self.update_followers(proto::update_followers::Variant::UpdateActiveView(
+            proto::UpdateActiveView {
                 id: self.active_item(cx).map(|item| item.id() as u64),
                 leader_id: self.leader_for_pane(&pane).map(|id| id.0),
-            }),
-            cx,
-        );
+            },
+        ));
     }
 
     fn handle_pane_event(
@@ -2594,22 +2626,8 @@ impl Workspace {
         Ok(())
     }
 
-    fn update_followers(
-        &self,
-        update: proto::update_followers::Variant,
-        cx: &AppContext,
-    ) -> Option<()> {
-        let project_id = self.project.read(cx).remote_id()?;
-        if !self.leader_state.followers.is_empty() {
-            self.client
-                .send(proto::UpdateFollowers {
-                    project_id,
-                    follower_ids: self.leader_state.followers.iter().map(|f| f.0).collect(),
-                    variant: Some(update),
-                })
-                .log_err();
-        }
-        None
+    fn update_followers(&self, update: proto::update_followers::Variant) {
+        let _ = self.follower_updates.unbounded_send(update);
     }
 
     pub fn leader_for_pane(&self, pane: &ViewHandle<Pane>) -> Option<PeerId> {