From fe93263ad450a1460ccb5edfde1ca868d132e8c6 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 17 Nov 2022 14:12:00 +0100 Subject: [PATCH] Wait for previous `UpdateFollowers` message ack before sending new ones --- crates/collab/src/integration_tests.rs | 82 +++++++++++++++++--------- crates/collab/src/rpc.rs | 4 +- crates/rpc/src/proto.rs | 1 + crates/workspace/src/workspace.rs | 76 +++++++++++++++--------- 4 files changed, 106 insertions(+), 57 deletions(-) diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index d730b5d4e777640b3d3b643a31cb3b1225b195b6..511851002443aada5f27a0c7f7508c5bd560e0b5 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -4672,7 +4672,7 @@ async fn test_following( cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, ) { - cx_a.foreground().forbid_parking(); + deterministic.forbid_parking(); cx_a.update(editor::init); cx_b.update(editor::init); @@ -4791,11 +4791,14 @@ async fn test_following( workspace_a.update(cx_a, |workspace, cx| { workspace.activate_item(&editor_a1, cx) }); - workspace_b - .condition(cx_b, |workspace, cx| { - workspace.active_item(cx).unwrap().id() == editor_b1.id() - }) - .await; + deterministic.run_until_parked(); + assert_eq!( + workspace_b.read_with(cx_b, |workspace, cx| workspace + .active_item(cx) + .unwrap() + .id()), + editor_b1.id() + ); // When client A navigates back and forth, client B does so as well. workspace_a @@ -4803,49 +4806,74 @@ async fn test_following( workspace::Pane::go_back(workspace, None, cx) }) .await; - workspace_b - .condition(cx_b, |workspace, cx| { - workspace.active_item(cx).unwrap().id() == editor_b2.id() - }) - .await; + deterministic.run_until_parked(); + assert_eq!( + workspace_b.read_with(cx_b, |workspace, cx| workspace + .active_item(cx) + .unwrap() + .id()), + editor_b2.id() + ); workspace_a .update(cx_a, |workspace, cx| { workspace::Pane::go_forward(workspace, None, cx) }) .await; - workspace_b - .condition(cx_b, |workspace, cx| { - workspace.active_item(cx).unwrap().id() == editor_b1.id() + workspace_a + .update(cx_a, |workspace, cx| { + workspace::Pane::go_back(workspace, None, cx) + }) + .await; + workspace_a + .update(cx_a, |workspace, cx| { + workspace::Pane::go_forward(workspace, None, cx) }) .await; + deterministic.run_until_parked(); + assert_eq!( + workspace_b.read_with(cx_b, |workspace, cx| workspace + .active_item(cx) + .unwrap() + .id()), + editor_b1.id() + ); // Changes to client A's editor are reflected on client B. editor_a1.update(cx_a, |editor, cx| { editor.change_selections(None, cx, |s| s.select_ranges([1..1, 2..2])); }); - editor_b1 - .condition(cx_b, |editor, cx| { - editor.selections.ranges(cx) == vec![1..1, 2..2] - }) - .await; + deterministic.run_until_parked(); + assert_eq!( + editor_b1.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)), + vec![1..1, 2..2] + ); editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx)); - editor_b1 - .condition(cx_b, |editor, cx| editor.text(cx) == "TWO") - .await; + deterministic.run_until_parked(); + assert_eq!( + editor_b1.read_with(cx_b, |editor, cx| editor.text(cx)), + "TWO" + ); editor_a1.update(cx_a, |editor, cx| { editor.change_selections(None, cx, |s| s.select_ranges([3..3])); editor.set_scroll_position(vec2f(0., 100.), cx); }); - editor_b1 - .condition(cx_b, |editor, cx| { - editor.selections.ranges(cx) == vec![3..3] - }) - .await; + deterministic.run_until_parked(); + assert_eq!( + editor_b1.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)), + vec![3..3] + ); // After unfollowing, client B stops receiving updates from client A. + assert_eq!( + workspace_b.read_with(cx_b, |workspace, cx| workspace + .active_item(cx) + .unwrap() + .id()), + editor_b1.id() + ); workspace_b.update(cx_b, |workspace, cx| { workspace.unfollow(&workspace.active_pane().clone(), cx) }); diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 70419623ef1ee18a49401953ec9cf4d2b47e2bb2..a07a8b37c870a3d070b840dc59c96a23a27c2087 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -192,7 +192,7 @@ impl Server { .add_request_handler(Server::respond_to_contact_request) .add_request_handler(Server::follow) .add_message_handler(Server::unfollow) - .add_message_handler(Server::update_followers) + .add_request_handler(Server::update_followers) .add_message_handler(Server::update_diff_base) .add_request_handler(Server::get_private_user_info); @@ -1437,6 +1437,7 @@ impl Server { async fn update_followers( self: Arc, request: Message, + response: Response, ) -> Result<()> { let project_id = ProjectId::from_proto(request.payload.project_id); let project_connection_ids = self @@ -1464,6 +1465,7 @@ impl Server { )?; } } + response.send(proto::Ack {})?; Ok(()) } diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 50f3c57f2a6b3c5bd9bc6798e468df7a541a2f07..8a59818fa3d2bb95423465014456901daa945897 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -229,6 +229,7 @@ request_messages!( (Test, Test), (UpdateBuffer, Ack), (UpdateDiagnosticSummary, Ack), + (UpdateFollowers, Ack), (UpdateParticipantLocation, Ack), (UpdateProject, Ack), (UpdateWorktree, Ack), diff --git a/crates/workspace/src/workspace.rs b/crates/workspace/src/workspace.rs index 2296741ed3c7f31768c2bd5857a463e18179c4fe..5f14427feea53cc1b19e2674eabf374b9d4254be 100644 --- a/crates/workspace/src/workspace.rs +++ b/crates/workspace/src/workspace.rs @@ -18,7 +18,10 @@ use collections::{hash_map, HashMap, HashSet}; use dock::{DefaultItemFactory, Dock, ToggleDockButton}; use drag_and_drop::DragAndDrop; use fs::{self, Fs}; -use futures::{channel::oneshot, FutureExt, StreamExt}; +use futures::{ + channel::{mpsc, oneshot}, + FutureExt, StreamExt, +}; use gpui::{ actions, elements::*, @@ -711,14 +714,13 @@ impl ItemHandle for ViewHandle { if let Some(followed_item) = self.to_followable_item_handle(cx) { if let Some(message) = followed_item.to_state_proto(cx) { - workspace.update_followers( - proto::update_followers::Variant::CreateView(proto::View { + workspace.update_followers(proto::update_followers::Variant::CreateView( + proto::View { id: followed_item.id() as u64, variant: Some(message), leader_id: workspace.leader_for_pane(&pane).map(|id| id.0), - }), - cx, - ); + }, + )); } } @@ -762,7 +764,7 @@ impl ItemHandle for ViewHandle { cx.after_window_update({ let pending_update = pending_update.clone(); let pending_update_scheduled = pending_update_scheduled.clone(); - move |this, cx| { + move |this, _| { pending_update_scheduled.store(false, SeqCst); this.update_followers( proto::update_followers::Variant::UpdateView( @@ -772,7 +774,6 @@ impl ItemHandle for ViewHandle { leader_id: leader_id.map(|id| id.0), }, ), - cx, ); } }); @@ -1081,9 +1082,11 @@ pub struct Workspace { leader_state: LeaderState, follower_states_by_leader: FollowerStatesByLeader, last_leaders_by_pane: HashMap, PeerId>, + follower_updates: mpsc::UnboundedSender, window_edited: bool, active_call: Option<(ModelHandle, Vec)>, _observe_current_user: Task<()>, + _update_followers: Task>, } #[derive(Default)] @@ -1166,6 +1169,34 @@ impl Workspace { } }); + let (follower_updates_tx, mut follower_updates_rx) = mpsc::unbounded(); + let _update_followers = cx.spawn_weak(|this, cx| async move { + while let Some(update) = follower_updates_rx.next().await { + let this = this.upgrade(&cx)?; + let update_followers = this.read_with(&cx, |this, cx| { + if let Some(project_id) = this.project.read(cx).remote_id() { + if this.leader_state.followers.is_empty() { + None + } else { + Some(this.client.request(proto::UpdateFollowers { + project_id, + follower_ids: + this.leader_state.followers.iter().map(|f| f.0).collect(), + variant: Some(update), + })) + } + } else { + None + } + }); + + if let Some(update_followers) = update_followers { + update_followers.await.log_err(); + } + } + None + }); + let handle = cx.handle(); let weak_handle = cx.weak_handle(); @@ -1224,10 +1255,12 @@ impl Workspace { project, leader_state: Default::default(), follower_states_by_leader: Default::default(), + follower_updates: follower_updates_tx, last_leaders_by_pane: Default::default(), window_edited: false, active_call, _observe_current_user, + _update_followers, }; this.project_remote_id_changed(this.project.read(cx).remote_id(), cx); cx.defer(|this, cx| this.update_window_title(cx)); @@ -1967,13 +2000,12 @@ impl Workspace { cx.notify(); } - self.update_followers( - proto::update_followers::Variant::UpdateActiveView(proto::UpdateActiveView { + self.update_followers(proto::update_followers::Variant::UpdateActiveView( + proto::UpdateActiveView { id: self.active_item(cx).map(|item| item.id() as u64), leader_id: self.leader_for_pane(&pane).map(|id| id.0), - }), - cx, - ); + }, + )); } fn handle_pane_event( @@ -2594,22 +2626,8 @@ impl Workspace { Ok(()) } - fn update_followers( - &self, - update: proto::update_followers::Variant, - cx: &AppContext, - ) -> Option<()> { - let project_id = self.project.read(cx).remote_id()?; - if !self.leader_state.followers.is_empty() { - self.client - .send(proto::UpdateFollowers { - project_id, - follower_ids: self.leader_state.followers.iter().map(|f| f.0).collect(), - variant: Some(update), - }) - .log_err(); - } - None + fn update_followers(&self, update: proto::update_followers::Variant) { + let _ = self.follower_updates.unbounded_send(update); } pub fn leader_for_pane(&self, pane: &ViewHandle) -> Option {