Revert "Wait for previous `UpdateFollowers` message ack before sending new ones"

Antonio Scandurra created

This reverts commit fe93263ad450a1460ccb5edfde1ca868d132e8c6.

Change summary

crates/collab/src/integration_tests.rs | 82 +++++++++------------------
crates/collab/src/rpc.rs               |  4 -
crates/rpc/src/proto.rs                |  1 
crates/workspace/src/workspace.rs      | 76 +++++++++----------------
4 files changed, 57 insertions(+), 106 deletions(-)

Detailed changes

crates/collab/src/integration_tests.rs 🔗

@@ -4672,7 +4672,7 @@ async fn test_following(
     cx_a: &mut TestAppContext,
     cx_b: &mut TestAppContext,
 ) {
-    deterministic.forbid_parking();
+    cx_a.foreground().forbid_parking();
     cx_a.update(editor::init);
     cx_b.update(editor::init);
 
@@ -4791,14 +4791,11 @@ async fn test_following(
     workspace_a.update(cx_a, |workspace, cx| {
         workspace.activate_item(&editor_a1, cx)
     });
-    deterministic.run_until_parked();
-    assert_eq!(
-        workspace_b.read_with(cx_b, |workspace, cx| workspace
-            .active_item(cx)
-            .unwrap()
-            .id()),
-        editor_b1.id()
-    );
+    workspace_b
+        .condition(cx_b, |workspace, cx| {
+            workspace.active_item(cx).unwrap().id() == editor_b1.id()
+        })
+        .await;
 
     // When client A navigates back and forth, client B does so as well.
     workspace_a
@@ -4806,74 +4803,49 @@ async fn test_following(
             workspace::Pane::go_back(workspace, None, cx)
         })
         .await;
-    deterministic.run_until_parked();
-    assert_eq!(
-        workspace_b.read_with(cx_b, |workspace, cx| workspace
-            .active_item(cx)
-            .unwrap()
-            .id()),
-        editor_b2.id()
-    );
-
-    workspace_a
-        .update(cx_a, |workspace, cx| {
-            workspace::Pane::go_forward(workspace, None, cx)
+    workspace_b
+        .condition(cx_b, |workspace, cx| {
+            workspace.active_item(cx).unwrap().id() == editor_b2.id()
         })
         .await;
+
     workspace_a
         .update(cx_a, |workspace, cx| {
-            workspace::Pane::go_back(workspace, None, cx)
+            workspace::Pane::go_forward(workspace, None, cx)
         })
         .await;
-    workspace_a
-        .update(cx_a, |workspace, cx| {
-            workspace::Pane::go_forward(workspace, None, cx)
+    workspace_b
+        .condition(cx_b, |workspace, cx| {
+            workspace.active_item(cx).unwrap().id() == editor_b1.id()
         })
         .await;
-    deterministic.run_until_parked();
-    assert_eq!(
-        workspace_b.read_with(cx_b, |workspace, cx| workspace
-            .active_item(cx)
-            .unwrap()
-            .id()),
-        editor_b1.id()
-    );
 
     // Changes to client A's editor are reflected on client B.
     editor_a1.update(cx_a, |editor, cx| {
         editor.change_selections(None, cx, |s| s.select_ranges([1..1, 2..2]));
     });
-    deterministic.run_until_parked();
-    assert_eq!(
-        editor_b1.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
-        vec![1..1, 2..2]
-    );
+    editor_b1
+        .condition(cx_b, |editor, cx| {
+            editor.selections.ranges(cx) == vec![1..1, 2..2]
+        })
+        .await;
 
     editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
-    deterministic.run_until_parked();
-    assert_eq!(
-        editor_b1.read_with(cx_b, |editor, cx| editor.text(cx)),
-        "TWO"
-    );
+    editor_b1
+        .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
+        .await;
 
     editor_a1.update(cx_a, |editor, cx| {
         editor.change_selections(None, cx, |s| s.select_ranges([3..3]));
         editor.set_scroll_position(vec2f(0., 100.), cx);
     });
-    deterministic.run_until_parked();
-    assert_eq!(
-        editor_b1.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
-        vec![3..3]
-    );
+    editor_b1
+        .condition(cx_b, |editor, cx| {
+            editor.selections.ranges(cx) == vec![3..3]
+        })
+        .await;
 
     // After unfollowing, client B stops receiving updates from client A.
-    assert_eq!(
-        workspace_b.read_with(cx_b, |workspace, cx| workspace
-            .active_item(cx)
-            .unwrap()
-            .id()),
-        editor_b1.id()
-    );
     workspace_b.update(cx_b, |workspace, cx| {
         workspace.unfollow(&workspace.active_pane().clone(), cx)
     });

crates/collab/src/rpc.rs 🔗

@@ -192,7 +192,7 @@ impl Server {
             .add_request_handler(Server::respond_to_contact_request)
             .add_request_handler(Server::follow)
             .add_message_handler(Server::unfollow)
-            .add_request_handler(Server::update_followers)
+            .add_message_handler(Server::update_followers)
             .add_message_handler(Server::update_diff_base)
             .add_request_handler(Server::get_private_user_info);
 
@@ -1437,7 +1437,6 @@ impl Server {
     async fn update_followers(
         self: Arc<Self>,
         request: Message<proto::UpdateFollowers>,
-        response: Response<proto::UpdateFollowers>,
     ) -> Result<()> {
         let project_id = ProjectId::from_proto(request.payload.project_id);
         let project_connection_ids = self
@@ -1465,7 +1464,6 @@ impl Server {
                 )?;
             }
         }
-        response.send(proto::Ack {})?;
         Ok(())
     }
 

crates/rpc/src/proto.rs 🔗

@@ -229,7 +229,6 @@ request_messages!(
     (Test, Test),
     (UpdateBuffer, Ack),
     (UpdateDiagnosticSummary, Ack),
-    (UpdateFollowers, Ack),
     (UpdateParticipantLocation, Ack),
     (UpdateProject, Ack),
     (UpdateWorktree, Ack),

crates/workspace/src/workspace.rs 🔗

@@ -18,10 +18,7 @@ use collections::{hash_map, HashMap, HashSet};
 use dock::{DefaultItemFactory, Dock, ToggleDockButton};
 use drag_and_drop::DragAndDrop;
 use fs::{self, Fs};
-use futures::{
-    channel::{mpsc, oneshot},
-    FutureExt, StreamExt,
-};
+use futures::{channel::oneshot, FutureExt, StreamExt};
 use gpui::{
     actions,
     elements::*,
@@ -714,13 +711,14 @@ impl<T: Item> ItemHandle for ViewHandle<T> {
 
         if let Some(followed_item) = self.to_followable_item_handle(cx) {
             if let Some(message) = followed_item.to_state_proto(cx) {
-                workspace.update_followers(proto::update_followers::Variant::CreateView(
-                    proto::View {
+                workspace.update_followers(
+                    proto::update_followers::Variant::CreateView(proto::View {
                         id: followed_item.id() as u64,
                         variant: Some(message),
                         leader_id: workspace.leader_for_pane(&pane).map(|id| id.0),
-                    },
-                ));
+                    }),
+                    cx,
+                );
             }
         }
 
@@ -764,7 +762,7 @@ impl<T: Item> ItemHandle for ViewHandle<T> {
                             cx.after_window_update({
                                 let pending_update = pending_update.clone();
                                 let pending_update_scheduled = pending_update_scheduled.clone();
-                                move |this, _| {
+                                move |this, cx| {
                                     pending_update_scheduled.store(false, SeqCst);
                                     this.update_followers(
                                         proto::update_followers::Variant::UpdateView(
@@ -774,6 +772,7 @@ impl<T: Item> ItemHandle for ViewHandle<T> {
                                                 leader_id: leader_id.map(|id| id.0),
                                             },
                                         ),
+                                        cx,
                                     );
                                 }
                             });
@@ -1082,11 +1081,9 @@ pub struct Workspace {
     leader_state: LeaderState,
     follower_states_by_leader: FollowerStatesByLeader,
     last_leaders_by_pane: HashMap<WeakViewHandle<Pane>, PeerId>,
-    follower_updates: mpsc::UnboundedSender<proto::update_followers::Variant>,
     window_edited: bool,
     active_call: Option<(ModelHandle<ActiveCall>, Vec<gpui::Subscription>)>,
     _observe_current_user: Task<()>,
-    _update_followers: Task<Option<()>>,
 }
 
 #[derive(Default)]
@@ -1169,34 +1166,6 @@ impl Workspace {
             }
         });
 
-        let (follower_updates_tx, mut follower_updates_rx) = mpsc::unbounded();
-        let _update_followers = cx.spawn_weak(|this, cx| async move {
-            while let Some(update) = follower_updates_rx.next().await {
-                let this = this.upgrade(&cx)?;
-                let update_followers = this.read_with(&cx, |this, cx| {
-                    if let Some(project_id) = this.project.read(cx).remote_id() {
-                        if this.leader_state.followers.is_empty() {
-                            None
-                        } else {
-                            Some(this.client.request(proto::UpdateFollowers {
-                                project_id,
-                                follower_ids:
-                                    this.leader_state.followers.iter().map(|f| f.0).collect(),
-                                variant: Some(update),
-                            }))
-                        }
-                    } else {
-                        None
-                    }
-                });
-
-                if let Some(update_followers) = update_followers {
-                    update_followers.await.log_err();
-                }
-            }
-            None
-        });
-
         let handle = cx.handle();
         let weak_handle = cx.weak_handle();
 
@@ -1255,12 +1224,10 @@ impl Workspace {
             project,
             leader_state: Default::default(),
             follower_states_by_leader: Default::default(),
-            follower_updates: follower_updates_tx,
             last_leaders_by_pane: Default::default(),
             window_edited: false,
             active_call,
             _observe_current_user,
-            _update_followers,
         };
         this.project_remote_id_changed(this.project.read(cx).remote_id(), cx);
         cx.defer(|this, cx| this.update_window_title(cx));
@@ -2000,12 +1967,13 @@ impl Workspace {
             cx.notify();
         }
 
-        self.update_followers(proto::update_followers::Variant::UpdateActiveView(
-            proto::UpdateActiveView {
+        self.update_followers(
+            proto::update_followers::Variant::UpdateActiveView(proto::UpdateActiveView {
                 id: self.active_item(cx).map(|item| item.id() as u64),
                 leader_id: self.leader_for_pane(&pane).map(|id| id.0),
-            },
-        ));
+            }),
+            cx,
+        );
     }
 
     fn handle_pane_event(
@@ -2626,8 +2594,22 @@ impl Workspace {
         Ok(())
     }
 
-    fn update_followers(&self, update: proto::update_followers::Variant) {
-        let _ = self.follower_updates.unbounded_send(update);
+    fn update_followers(
+        &self,
+        update: proto::update_followers::Variant,
+        cx: &AppContext,
+    ) -> Option<()> {
+        let project_id = self.project.read(cx).remote_id()?;
+        if !self.leader_state.followers.is_empty() {
+            self.client
+                .send(proto::UpdateFollowers {
+                    project_id,
+                    follower_ids: self.leader_state.followers.iter().map(|f| f.0).collect(),
+                    variant: Some(update),
+                })
+                .log_err();
+        }
+        None
     }
 
     pub fn leader_for_pane(&self, pane: &ViewHandle<Pane>) -> Option<PeerId> {