From 3afc9d8607dc3803e67ff5c2ce9c7853ab45abab Mon Sep 17 00:00:00 2001 From: "gcp-cherry-pick-bot[bot]" <98988430+gcp-cherry-pick-bot[bot]@users.noreply.github.com> Date: Wed, 6 Mar 2024 15:41:19 +0100 Subject: [PATCH] Throttle the sending of UpdateFollowers messages (cherry-pick #8918) (#8946) Cherry-picked Throttle the sending of UpdateFollowers messages (#8918) ## Problem We're trying to figure out why we sometimes see high latency when collaborating, even though the collab server logs indicate that messages are not taking long to process. We think that high volumes of certain types of messages, including `UpdateFollowers` may cause a lot of messages to queue up, causing delays before collab sees certain messages. ## Fix This PR reduces the number of `UpdateFollowers` messages that clients send to collab when scrolling around or moving the cursor, using a time-based throttle. The downside of this change is that scrolling will not be as smooth when following someone. The advantage is that it will be much easier to keep up with the stream of updates, since they will be sent much less frequently. ## Release Notes: - Fixed slowness that could occur when collaborating due to excessive messages being sent to support following. --------- Co-authored-by: Nathan Co-authored-by: Conrad Co-authored-by: Antonio Scandurra Co-authored-by: Thorsten Co-authored-by: Thorsten Ball Co-authored-by: Max Brunsfeld Co-authored-by: Nathan Co-authored-by: Conrad Co-authored-by: Antonio Scandurra Co-authored-by: Thorsten Co-authored-by: Thorsten Ball --- Cargo.lock | 1 + crates/collab/src/tests/following_tests.rs | 9 ++ crates/gpui/src/test.rs | 2 +- crates/picker/Cargo.toml | 1 + crates/picker/src/picker.rs | 41 +++++++-- crates/workspace/src/item.rs | 98 ++++++++++++++-------- crates/workspace/src/workspace.rs | 2 +- 7 files changed, 109 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2154d4e01ddb38cb6580d0d03ff1b628550c8ec0..5b461b533e73c733907520f9813d88780f94fe36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6374,6 +6374,7 @@ dependencies = [ name = "picker" version = "0.1.0" dependencies = [ + "anyhow", "ctor", "editor", "env_logger", diff --git a/crates/collab/src/tests/following_tests.rs b/crates/collab/src/tests/following_tests.rs index 7c18d020cba609a1e82e4bedc0a34629a6400de3..9765a44e279c65d0c22b65254648ec8f3a9399b9 100644 --- a/crates/collab/src/tests/following_tests.rs +++ b/crates/collab/src/tests/following_tests.rs @@ -373,8 +373,10 @@ async fn test_basic_following( editor_a1.update(cx_a, |editor, cx| { editor.change_selections(None, cx, |s| s.select_ranges([1..1, 2..2])); }); + executor.advance_clock(workspace::item::LEADER_UPDATE_THROTTLE); executor.run_until_parked(); cx_b.background_executor.run_until_parked(); + editor_b1.update(cx_b, |editor, cx| { assert_eq!(editor.selections.ranges(cx), &[1..1, 2..2]); }); @@ -387,6 +389,7 @@ async fn test_basic_following( editor.change_selections(None, cx, |s| s.select_ranges([3..3])); editor.set_scroll_position(point(0., 100.), cx); }); + executor.advance_clock(workspace::item::LEADER_UPDATE_THROTTLE); executor.run_until_parked(); editor_b1.update(cx_b, |editor, cx| { assert_eq!(editor.selections.ranges(cx), &[3..3]); @@ -1600,6 +1603,8 @@ async fn test_following_stops_on_unshare(cx_a: &mut TestAppContext, cx_b: &mut T editor_a.update(cx_a, |editor, cx| { editor.change_selections(None, cx, |s| s.select_ranges([1..1])) }); + cx_a.executor() + .advance_clock(workspace::item::LEADER_UPDATE_THROTTLE); cx_a.run_until_parked(); editor_b.update(cx_b, |editor, cx| { assert_eq!(editor.selections.ranges(cx), vec![1..1]) @@ -1618,6 +1623,8 @@ async fn test_following_stops_on_unshare(cx_a: &mut TestAppContext, cx_b: &mut T editor_a.update(cx_a, |editor, cx| { editor.change_selections(None, cx, |s| s.select_ranges([2..2])) }); + cx_a.executor() + .advance_clock(workspace::item::LEADER_UPDATE_THROTTLE); cx_a.run_until_parked(); editor_b.update(cx_b, |editor, cx| { assert_eq!(editor.selections.ranges(cx), vec![1..1]) @@ -1722,6 +1729,7 @@ async fn test_following_into_excluded_file( // When client B starts following client A, currently visible file is replicated workspace_b.update(cx_b, |workspace, cx| workspace.follow(peer_id_a, cx)); + executor.advance_clock(workspace::item::LEADER_UPDATE_THROTTLE); executor.run_until_parked(); let editor_for_excluded_b = workspace_b.update(cx_b, |workspace, cx| { @@ -1743,6 +1751,7 @@ async fn test_following_into_excluded_file( editor_for_excluded_a.update(cx_a, |editor, cx| { editor.select_right(&Default::default(), cx); }); + executor.advance_clock(workspace::item::LEADER_UPDATE_THROTTLE); executor.run_until_parked(); // Changes from B to the excluded file are replicated in A's editor diff --git a/crates/gpui/src/test.rs b/crates/gpui/src/test.rs index 77540704f9c9e2543b102a3ee352570b144d62f2..7793f0261a8d50cf808064e02703cb0829d0c1c8 100644 --- a/crates/gpui/src/test.rs +++ b/crates/gpui/src/test.rs @@ -46,10 +46,10 @@ pub fn run_test( let starting_seed = env::var("SEED") .map(|seed| seed.parse().expect("invalid SEED variable")) .unwrap_or(0); - let is_randomized = num_iterations > 1; if let Ok(iterations) = env::var("ITERATIONS") { num_iterations = iterations.parse().expect("invalid ITERATIONS variable"); } + let is_randomized = num_iterations > 1; for seed in starting_seed..starting_seed + num_iterations { let mut retry = 0; diff --git a/crates/picker/Cargo.toml b/crates/picker/Cargo.toml index 510ce14a594fc1162a5a39c2eb272516382353ff..f470ad22a205f7fab9e4188bd379e7ae6a69f941 100644 --- a/crates/picker/Cargo.toml +++ b/crates/picker/Cargo.toml @@ -10,6 +10,7 @@ path = "src/picker.rs" doctest = false [dependencies] +anyhow.workspace = true editor.workspace = true gpui.workspace = true menu.workspace = true diff --git a/crates/picker/src/picker.rs b/crates/picker/src/picker.rs index 55ee34003599d5454f6a535ec63b4424ad32072e..17279414160608bd7eeb26fb8de04c966dce1877 100644 --- a/crates/picker/src/picker.rs +++ b/crates/picker/src/picker.rs @@ -1,3 +1,4 @@ +use anyhow::Result; use editor::Editor; use gpui::{ div, list, prelude::*, uniform_list, AnyElement, AppContext, ClickEvent, DismissEvent, @@ -13,11 +14,16 @@ enum ElementContainer { UniformList(UniformListScrollHandle), } +struct PendingUpdateMatches { + delegate_update_matches: Option>, + _task: Task>, +} + pub struct Picker { pub delegate: D, element_container: ElementContainer, editor: View, - pending_update_matches: Option>, + pending_update_matches: Option, confirm_on_update: Option, width: Option, max_height: Option, @@ -268,15 +274,32 @@ impl Picker { } pub fn update_matches(&mut self, query: String, cx: &mut ViewContext) { - let update = self.delegate.update_matches(query, cx); + let delegate_pending_update_matches = self.delegate.update_matches(query, cx); + self.matches_updated(cx); - self.pending_update_matches = Some(cx.spawn(|this, mut cx| async move { - update.await; - this.update(&mut cx, |this, cx| { - this.matches_updated(cx); - }) - .ok(); - })); + // This struct ensures that we can synchronously drop the task returned by the + // delegate's `update_matches` method and the task that the picker is spawning. + // If we simply capture the delegate's task into the picker's task, when the picker's + // task gets synchronously dropped, the delegate's task would keep running until + // the picker's task has a chance of being scheduled, because dropping a task happens + // asynchronously. + self.pending_update_matches = Some(PendingUpdateMatches { + delegate_update_matches: Some(delegate_pending_update_matches), + _task: cx.spawn(|this, mut cx| async move { + let delegate_pending_update_matches = this.update(&mut cx, |this, _| { + this.pending_update_matches + .as_mut() + .unwrap() + .delegate_update_matches + .take() + .unwrap() + })?; + delegate_pending_update_matches.await; + this.update(&mut cx, |this, cx| { + this.matches_updated(cx); + }) + }), + }); } fn matches_updated(&mut self, cx: &mut ViewContext) { diff --git a/crates/workspace/src/item.rs b/crates/workspace/src/item.rs index d5d8aed39dd300f3e5dd9eccd549d79272b2bae9..ed5b29c3269b9bc9ce3712be1cb8cd467ab91f00 100644 --- a/crates/workspace/src/item.rs +++ b/crates/workspace/src/item.rs @@ -11,6 +11,7 @@ use client::{ proto::{self, PeerId}, Client, }; +use futures::{channel::mpsc, StreamExt}; use gpui::{ AnyElement, AnyView, AppContext, Entity, EntityId, EventEmitter, FocusHandle, FocusableView, HighlightStyle, Model, Pixels, Point, SharedString, Task, View, ViewContext, WeakView, @@ -27,14 +28,13 @@ use std::{ ops::Range, path::PathBuf, rc::Rc, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, + sync::Arc, time::Duration, }; use theme::Theme; +pub const LEADER_UPDATE_THROTTLE: Duration = Duration::from_millis(200); + #[derive(Deserialize)] pub struct ItemSettings { pub git_status: bool, @@ -405,7 +405,7 @@ impl ItemHandle for View { followed_item.is_project_item(cx), proto::update_followers::Variant::CreateView(proto::View { id: followed_item - .remote_id(&workspace.app_state.client, cx) + .remote_id(&workspace.client(), cx) .map(|id| id.to_proto()), variant: Some(message), leader_id: workspace.leader_for_pane(&pane), @@ -421,8 +421,46 @@ impl ItemHandle for View { .is_none() { let mut pending_autosave = DelayedDebouncedEditAction::new(); + let (pending_update_tx, mut pending_update_rx) = mpsc::unbounded(); let pending_update = Rc::new(RefCell::new(None)); - let pending_update_scheduled = Arc::new(AtomicBool::new(false)); + + let mut send_follower_updates = None; + if let Some(item) = self.to_followable_item_handle(cx) { + let is_project_item = item.is_project_item(cx); + let item = item.downgrade(); + + send_follower_updates = Some(cx.spawn({ + let pending_update = pending_update.clone(); + |workspace, mut cx| async move { + while let Some(mut leader_id) = pending_update_rx.next().await { + while let Ok(Some(id)) = pending_update_rx.try_next() { + leader_id = id; + } + + workspace.update(&mut cx, |workspace, cx| { + let item = item.upgrade().expect( + "item to be alive, otherwise task would have been dropped", + ); + workspace.update_followers( + is_project_item, + proto::update_followers::Variant::UpdateView( + proto::UpdateView { + id: item + .remote_id(workspace.client(), cx) + .map(|id| id.to_proto()), + variant: pending_update.borrow_mut().take(), + leader_id, + }, + ), + cx, + ); + })?; + cx.background_executor().timer(LEADER_UPDATE_THROTTLE).await; + } + anyhow::Ok(()) + } + })); + } let mut event_subscription = Some(cx.subscribe(self, move |workspace, item, event, cx| { @@ -438,9 +476,7 @@ impl ItemHandle for View { }; if let Some(item) = item.to_followable_item_handle(cx) { - let is_project_item = item.is_project_item(cx); let leader_id = workspace.leader_for_pane(&pane); - let follow_event = item.to_follow_event(event); if leader_id.is_some() && matches!(follow_event, Some(FollowEvent::Unfollow)) @@ -448,35 +484,13 @@ impl ItemHandle for View { workspace.unfollow(&pane, cx); } - if item.focus_handle(cx).contains_focused(cx) - && item.add_event_to_update_proto( + if item.focus_handle(cx).contains_focused(cx) { + item.add_event_to_update_proto( event, &mut *pending_update.borrow_mut(), cx, - ) - && !pending_update_scheduled.load(Ordering::SeqCst) - { - pending_update_scheduled.store(true, Ordering::SeqCst); - cx.defer({ - let pending_update = pending_update.clone(); - let pending_update_scheduled = pending_update_scheduled.clone(); - move |this, cx| { - pending_update_scheduled.store(false, Ordering::SeqCst); - this.update_followers( - is_project_item, - proto::update_followers::Variant::UpdateView( - proto::UpdateView { - id: item - .remote_id(&this.app_state.client, cx) - .map(|id| id.to_proto()), - variant: pending_update.borrow_mut().take(), - leader_id, - }, - ), - cx, - ); - } - }); + ); + pending_update_tx.unbounded_send(leader_id).ok(); } } @@ -525,6 +539,7 @@ impl ItemHandle for View { cx.observe_release(self, move |workspace, _, _| { workspace.panes_by_item.remove(&item_id); event_subscription.take(); + send_follower_updates.take(); }) .detach(); } @@ -700,6 +715,7 @@ pub trait FollowableItem: Item { pub trait FollowableItemHandle: ItemHandle { fn remote_id(&self, client: &Arc, cx: &WindowContext) -> Option; + fn downgrade(&self) -> Box; fn set_leader_peer_id(&self, leader_peer_id: Option, cx: &mut WindowContext); fn to_state_proto(&self, cx: &WindowContext) -> Option; fn add_event_to_update_proto( @@ -728,6 +744,10 @@ impl FollowableItemHandle for View { }) } + fn downgrade(&self) -> Box { + Box::new(self.downgrade()) + } + fn set_leader_peer_id(&self, leader_peer_id: Option, cx: &mut WindowContext) { self.update(cx, |this, cx| this.set_leader_peer_id(leader_peer_id, cx)) } @@ -767,6 +787,16 @@ impl FollowableItemHandle for View { } } +pub trait WeakFollowableItemHandle: Send + Sync { + fn upgrade(&self) -> Option>; +} + +impl WeakFollowableItemHandle for WeakView { + fn upgrade(&self) -> Option> { + Some(Box::new(self.upgrade()?)) + } +} + #[cfg(any(test, feature = "test-support"))] pub mod test { use super::{Item, ItemEvent}; diff --git a/crates/workspace/src/workspace.rs b/crates/workspace/src/workspace.rs index 74eec49d2ee974ceb185948f257771388b023810..344c92773690681b127e57e56c5c314eefbc4374 100644 --- a/crates/workspace/src/workspace.rs +++ b/crates/workspace/src/workspace.rs @@ -1109,7 +1109,7 @@ impl Workspace { ) } - pub fn client(&self) -> &Client { + pub fn client(&self) -> &Arc { &self.app_state.client }