Fetch missing buffers when adding excerpts to a multibuffer while following

Max Brunsfeld and Antonio Scandurra created

Make FollowableItem::apply_update_proto asynchronous. Use a single
task per workspace to process all leader updates, to prevent updates
from being interleaved.

Co-authored-by: Antonio Scandurra <antonio@zed.dev>

Change summary

crates/editor/src/editor_tests.rs |  92 ++++++++++-------
crates/editor/src/items.rs        |  83 +++++++++------
crates/workspace/src/workspace.rs | 171 +++++++++++++++-----------------
3 files changed, 180 insertions(+), 166 deletions(-)

Detailed changes

crates/editor/src/editor_tests.rs 🔗

@@ -5006,11 +5006,12 @@ async fn test_following(cx: &mut gpui::TestAppContext) {
     leader.update(cx, |leader, cx| {
         leader.change_selections(None, cx, |s| s.select_ranges([1..1]));
     });
-    follower.update(cx, |follower, cx| {
-        follower
-            .apply_update_proto(&project, pending_update.borrow_mut().take().unwrap(), cx)
-            .unwrap();
-    });
+    follower
+        .update(cx, |follower, cx| {
+            follower.apply_update_proto(&project, pending_update.borrow_mut().take().unwrap(), cx)
+        })
+        .await
+        .unwrap();
     follower.read_with(cx, |follower, cx| {
         assert_eq!(follower.selections.ranges(cx), vec![1..1]);
     });
@@ -5019,31 +5020,32 @@ async fn test_following(cx: &mut gpui::TestAppContext) {
     leader.update(cx, |leader, cx| {
         leader.set_scroll_position(vec2f(1.5, 3.5), cx);
     });
-    follower.update(cx, |follower, cx| {
-        follower
-            .apply_update_proto(&project, pending_update.borrow_mut().take().unwrap(), cx)
-            .unwrap();
-    });
+    follower
+        .update(cx, |follower, cx| {
+            follower.apply_update_proto(&project, pending_update.borrow_mut().take().unwrap(), cx)
+        })
+        .await
+        .unwrap();
     assert_eq!(
         follower.update(cx, |follower, cx| follower.scroll_position(cx)),
         vec2f(1.5, 3.5)
     );
 
-    // Update the selections and scroll position
+    // Update the selections and scroll position. The follower's scroll position is updated
+    // via autoscroll, not via the leader's exact scroll position.
     leader.update(cx, |leader, cx| {
         leader.change_selections(None, cx, |s| s.select_ranges([0..0]));
         leader.request_autoscroll(Autoscroll::newest(), cx);
         leader.set_scroll_position(vec2f(1.5, 3.5), cx);
     });
+    follower
+        .update(cx, |follower, cx| {
+            follower.apply_update_proto(&project, pending_update.borrow_mut().take().unwrap(), cx)
+        })
+        .await
+        .unwrap();
     follower.update(cx, |follower, cx| {
-        let initial_scroll_position = follower.scroll_position(cx);
-        follower
-            .apply_update_proto(&project, pending_update.borrow_mut().take().unwrap(), cx)
-            .unwrap();
-        assert_eq!(follower.scroll_position(cx), initial_scroll_position);
-        assert!(follower.autoscroll_request.is_some());
-    });
-    follower.read_with(cx, |follower, cx| {
+        assert_eq!(follower.scroll_position(cx), vec2f(1.5, 0.0));
         assert_eq!(follower.selections.ranges(cx), vec![0..0]);
     });
 
@@ -5052,11 +5054,12 @@ async fn test_following(cx: &mut gpui::TestAppContext) {
         leader.change_selections(None, cx, |s| s.select_ranges([1..1]));
         leader.begin_selection(DisplayPoint::new(0, 0), true, 1, cx);
     });
-    follower.update(cx, |follower, cx| {
-        follower
-            .apply_update_proto(&project, pending_update.borrow_mut().take().unwrap(), cx)
-            .unwrap();
-    });
+    follower
+        .update(cx, |follower, cx| {
+            follower.apply_update_proto(&project, pending_update.borrow_mut().take().unwrap(), cx)
+        })
+        .await
+        .unwrap();
     follower.read_with(cx, |follower, cx| {
         assert_eq!(follower.selections.ranges(cx), vec![0..0, 1..1]);
     });
@@ -5065,11 +5068,12 @@ async fn test_following(cx: &mut gpui::TestAppContext) {
     leader.update(cx, |leader, cx| {
         leader.extend_selection(DisplayPoint::new(0, 2), 1, cx);
     });
-    follower.update(cx, |follower, cx| {
-        follower
-            .apply_update_proto(&project, pending_update.borrow_mut().take().unwrap(), cx)
-            .unwrap();
-    });
+    follower
+        .update(cx, |follower, cx| {
+            follower.apply_update_proto(&project, pending_update.borrow_mut().take().unwrap(), cx)
+        })
+        .await
+        .unwrap();
     follower.read_with(cx, |follower, cx| {
         assert_eq!(follower.selections.ranges(cx), vec![0..2]);
     });
@@ -5175,11 +5179,16 @@ async fn test_following_with_multiple_excerpts(cx: &mut gpui::TestAppContext) {
     );
 
     // Apply the update of adding the excerpts.
-    follower_1.update(cx, |follower, cx| {
-        follower
-            .apply_update_proto(&project, follower_1_update.borrow_mut().take().unwrap(), cx)
-            .unwrap()
-    });
+    follower_1
+        .update(cx, |follower, cx| {
+            follower.apply_update_proto(
+                &project,
+                follower_1_update.borrow_mut().take().unwrap(),
+                cx,
+            )
+        })
+        .await
+        .unwrap();
     assert_eq!(
         follower_1.read_with(cx, Editor::text),
         leader.read_with(cx, Editor::text)
@@ -5195,11 +5204,16 @@ async fn test_following_with_multiple_excerpts(cx: &mut gpui::TestAppContext) {
     });
 
     // Apply the update of removing the excerpts.
-    follower_1.update(cx, |follower, cx| {
-        follower
-            .apply_update_proto(&project, follower_1_update.borrow_mut().take().unwrap(), cx)
-            .unwrap()
-    });
+    follower_1
+        .update(cx, |follower, cx| {
+            follower.apply_update_proto(
+                &project,
+                follower_1_update.borrow_mut().take().unwrap(),
+                cx,
+            )
+        })
+        .await
+        .unwrap();
     assert_eq!(
         follower_1.read_with(cx, Editor::text),
         leader.read_with(cx, Editor::text)

crates/editor/src/items.rs 🔗

@@ -5,6 +5,7 @@ use crate::{
 };
 use anyhow::{anyhow, Result};
 use collections::HashSet;
+use futures::future::try_join_all;
 use futures::FutureExt;
 use gpui::{
     elements::*, geometry::vector::vec2f, AppContext, Entity, ModelHandle, MutableAppContext,
@@ -275,36 +276,53 @@ impl FollowableItem for Editor {
         project: &ModelHandle<Project>,
         message: update_view::Variant,
         cx: &mut ViewContext<Self>,
-    ) -> Result<()> {
-        match message {
-            update_view::Variant::Editor(message) => {
-                let multibuffer = self.buffer.read(cx);
-                let multibuffer = multibuffer.read(cx);
-                let mut removals = message
-                    .deleted_excerpts
-                    .into_iter()
-                    .map(ExcerptId::from_proto)
-                    .collect::<Vec<_>>();
-                removals.sort_by(|a, b| a.cmp(&b, &multibuffer));
+    ) -> Task<Result<()>> {
+        let update_view::Variant::Editor(message) = message;
+        let multibuffer = self.buffer.read(cx);
+        let multibuffer = multibuffer.read(cx);
 
-                let selections = message
-                    .selections
-                    .into_iter()
-                    .filter_map(|selection| deserialize_selection(&multibuffer, selection))
-                    .collect::<Vec<_>>();
-                let scroll_top_anchor = message
-                    .scroll_top_anchor
-                    .and_then(|anchor| deserialize_anchor(&multibuffer, anchor));
-                drop(multibuffer);
+        let buffer_ids = message
+            .inserted_excerpts
+            .iter()
+            .filter_map(|insertion| Some(insertion.excerpt.as_ref()?.buffer_id))
+            .collect::<HashSet<_>>();
+
+        let mut removals = message
+            .deleted_excerpts
+            .into_iter()
+            .map(ExcerptId::from_proto)
+            .collect::<Vec<_>>();
+        removals.sort_by(|a, b| a.cmp(&b, &multibuffer));
+
+        let selections = message
+            .selections
+            .into_iter()
+            .filter_map(|selection| deserialize_selection(&multibuffer, selection))
+            .collect::<Vec<_>>();
+        let scroll_top_anchor = message
+            .scroll_top_anchor
+            .and_then(|anchor| deserialize_anchor(&multibuffer, anchor));
+        drop(multibuffer);
+
+        let buffers = project.update(cx, |project, cx| {
+            buffer_ids
+                .into_iter()
+                .map(|id| project.open_buffer_by_id(id, cx))
+                .collect::<Vec<_>>()
+        });
 
-                self.buffer.update(cx, |multibuffer, cx| {
+        let project = project.clone();
+        cx.spawn(|this, mut cx| async move {
+            let _buffers = try_join_all(buffers).await?;
+            this.update(&mut cx, |this, cx| {
+                this.buffer.update(cx, |multibuffer, cx| {
                     let mut insertions = message.inserted_excerpts.into_iter().peekable();
                     while let Some(insertion) = insertions.next() {
                         let Some(excerpt) = insertion.excerpt else { continue };
                         let Some(previous_excerpt_id) = insertion.previous_excerpt_id else { continue };
                         let buffer_id = excerpt.buffer_id;
                         let Some(buffer) = project.read(cx).buffer_for_id(buffer_id, cx) else { continue };
-    
+
                         let adjacent_excerpts = iter::from_fn(|| {
                             let insertion = insertions.peek()?;
                             if insertion.previous_excerpt_id.is_none()
@@ -315,7 +333,7 @@ impl FollowableItem for Editor {
                                 None
                             }
                         });
-    
+
                         multibuffer.insert_excerpts_with_ids_after(
                             ExcerptId::from_proto(previous_excerpt_id),
                             buffer,
@@ -331,24 +349,19 @@ impl FollowableItem for Editor {
                             cx,
                         );
                     }
-                    
+
                     multibuffer.remove_excerpts(removals, cx);
                 });
 
-
                 if !selections.is_empty() {
-                    self.set_selections_from_remote(selections, cx);
-                    self.request_autoscroll_remotely(Autoscroll::newest(), cx);
+                    this.set_selections_from_remote(selections, cx);
+                    this.request_autoscroll_remotely(Autoscroll::newest(), cx);
                 } else if let Some(anchor) = scroll_top_anchor {
-                    self.set_scroll_top_anchor(
-                        anchor,
-                        vec2f(message.scroll_x, message.scroll_y),
-                        cx,
-                    );
+                    this.set_scroll_top_anchor(anchor, vec2f(message.scroll_x, message.scroll_y), cx);
                 }
-            }
-        }
-        Ok(())
+            });
+            Ok(())
+        })
     }
 
     fn should_unfollow_on_event(event: &Self::Event, _: &AppContext) -> bool {

crates/workspace/src/workspace.rs 🔗

@@ -11,14 +11,18 @@ pub mod sidebar;
 mod status_bar;
 mod toolbar;
 
-use anyhow::{anyhow, Context, Result};
+use anyhow::{anyhow, Result};
 use call::ActiveCall;
 use client::{proto, Client, PeerId, TypedEnvelope, UserStore};
 use collections::{hash_map, HashMap, HashSet};
 use dock::{DefaultItemFactory, Dock, ToggleDockButton};
 use drag_and_drop::DragAndDrop;
 use fs::{self, Fs};
-use futures::{channel::oneshot, FutureExt, StreamExt};
+use futures::{
+    channel::{mpsc, oneshot},
+    future::try_join_all,
+    FutureExt, StreamExt,
+};
 use gpui::{
     actions,
     elements::*,
@@ -466,7 +470,7 @@ pub trait FollowableItem: Item {
         project: &ModelHandle<Project>,
         message: proto::update_view::Variant,
         cx: &mut ViewContext<Self>,
-    ) -> Result<()>;
+    ) -> Task<Result<()>>;
 
     fn set_leader_replica_id(&mut self, leader_replica_id: Option<u16>, cx: &mut ViewContext<Self>);
     fn should_unfollow_on_event(event: &Self::Event, cx: &AppContext) -> bool;
@@ -486,7 +490,7 @@ pub trait FollowableItemHandle: ItemHandle {
         project: &ModelHandle<Project>,
         message: proto::update_view::Variant,
         cx: &mut MutableAppContext,
-    ) -> Result<()>;
+    ) -> Task<Result<()>>;
     fn should_unfollow_on_event(&self, event: &dyn Any, cx: &AppContext) -> bool;
 }
 
@@ -519,7 +523,7 @@ impl<T: FollowableItem> FollowableItemHandle for ViewHandle<T> {
         project: &ModelHandle<Project>,
         message: proto::update_view::Variant,
         cx: &mut MutableAppContext,
-    ) -> Result<()> {
+    ) -> Task<Result<()>> {
         self.update(cx, |this, cx| this.apply_update_proto(project, message, cx))
     }
 
@@ -1089,6 +1093,8 @@ pub struct Workspace {
     last_leaders_by_pane: HashMap<WeakViewHandle<Pane>, PeerId>,
     window_edited: bool,
     active_call: Option<(ModelHandle<ActiveCall>, Vec<gpui::Subscription>)>,
+    leader_updates_tx: mpsc::UnboundedSender<(PeerId, proto::UpdateFollowers)>,
+    _apply_leader_updates: Task<Result<()>>,
     _observe_current_user: Task<()>,
 }
 
@@ -1102,13 +1108,7 @@ type FollowerStatesByLeader = HashMap<PeerId, HashMap<ViewHandle<Pane>, Follower
 #[derive(Default)]
 struct FollowerState {
     active_view_id: Option<u64>,
-    items_by_leader_view_id: HashMap<u64, FollowerItem>,
-}
-
-#[derive(Debug)]
-enum FollowerItem {
-    Loading(Vec<proto::update_view::Variant>),
-    Loaded(Box<dyn FollowableItemHandle>),
+    items_by_leader_view_id: HashMap<u64, Box<dyn FollowableItemHandle>>,
 }
 
 impl Workspace {
@@ -1171,10 +1171,24 @@ impl Workspace {
                 })
             }
         });
-
         let handle = cx.handle();
         let weak_handle = cx.weak_handle();
 
+        // All leader updates are enqueued and then processed in a single task, so
+        // that each asynchronous operation can be run in order.
+        let (leader_updates_tx, mut leader_updates_rx) =
+            mpsc::unbounded::<(PeerId, proto::UpdateFollowers)>();
+        let _apply_leader_updates = cx.spawn_weak(|this, mut cx| async move {
+            while let Some((leader_id, update)) = leader_updates_rx.next().await {
+                let Some(this) = this.upgrade(&cx) else { break };
+                Self::process_leader_update(this, leader_id, update, &mut cx)
+                    .await
+                    .log_err();
+            }
+
+            Ok(())
+        });
+
         cx.emit_global(WorkspaceCreated(weak_handle.clone()));
 
         let dock = Dock::new(cx, dock_default_factory);
@@ -1234,6 +1248,8 @@ impl Workspace {
             window_edited: false,
             active_call,
             _observe_current_user,
+            _apply_leader_updates,
+            leader_updates_tx,
         };
         this.project_remote_id_changed(this.project.read(cx).remote_id(), cx);
         cx.defer(|this, cx| this.update_window_title(cx));
@@ -2119,9 +2135,7 @@ impl Workspace {
         if let Some(states_by_pane) = self.follower_states_by_leader.remove(&peer_id) {
             for state in states_by_pane.into_values() {
                 for item in state.items_by_leader_view_id.into_values() {
-                    if let FollowerItem::Loaded(item) = item {
-                        item.set_leader_replica_id(None, cx);
-                    }
+                    item.set_leader_replica_id(None, cx);
                 }
             }
         }
@@ -2167,8 +2181,15 @@ impl Workspace {
                     state.active_view_id = response.active_view_id;
                     Ok::<_, anyhow::Error>(())
                 })?;
-                Self::add_views_from_leader(this, leader_id, vec![pane], response.views, &mut cx)
-                    .await?;
+                Self::add_views_from_leader(
+                    this.clone(),
+                    leader_id,
+                    vec![pane],
+                    response.views,
+                    &mut cx,
+                )
+                .await?;
+                this.update(&mut cx, |this, cx| this.leader_updated(leader_id, cx));
             }
             Ok(())
         }))
@@ -2214,9 +2235,7 @@ impl Workspace {
             let leader_id = *leader_id;
             if let Some(state) = states_by_pane.remove(pane) {
                 for (_, item) in state.items_by_leader_view_id {
-                    if let FollowerItem::Loaded(item) = item {
-                        item.set_leader_replica_id(None, cx);
-                    }
+                    item.set_leader_replica_id(None, cx);
                 }
 
                 if states_by_pane.is_empty() {
@@ -2459,46 +2478,51 @@ impl Workspace {
         this: ViewHandle<Self>,
         envelope: TypedEnvelope<proto::UpdateFollowers>,
         _: Arc<Client>,
-        mut cx: AsyncAppContext,
+        cx: AsyncAppContext,
     ) -> Result<()> {
         let leader_id = envelope.original_sender_id()?;
-        match envelope
-            .payload
-            .variant
-            .ok_or_else(|| anyhow!("invalid update"))?
-        {
+        this.read_with(&cx, |this, _| {
+            this.leader_updates_tx
+                .unbounded_send((leader_id, envelope.payload))
+        })?;
+        Ok(())
+    }
+
+    async fn process_leader_update(
+        this: ViewHandle<Self>,
+        leader_id: PeerId,
+        update: proto::UpdateFollowers,
+        cx: &mut AsyncAppContext,
+    ) -> Result<()> {
+        match update.variant.ok_or_else(|| anyhow!("invalid update"))? {
             proto::update_followers::Variant::UpdateActiveView(update_active_view) => {
-                this.update(&mut cx, |this, cx| {
-                    this.update_leader_state(leader_id, cx, |state, _| {
-                        state.active_view_id = update_active_view.id;
-                    });
-                    Ok::<_, anyhow::Error>(())
-                })
+                this.update(cx, |this, _| {
+                    if let Some(state) = this.follower_states_by_leader.get_mut(&leader_id) {
+                        for state in state.values_mut() {
+                            state.active_view_id = update_active_view.id;
+                        }
+                    }
+                });
             }
             proto::update_followers::Variant::UpdateView(update_view) => {
-                this.update(&mut cx, |this, cx| {
-                    let variant = update_view
-                        .variant
-                        .ok_or_else(|| anyhow!("missing update view variant"))?;
+                let variant = update_view
+                    .variant
+                    .ok_or_else(|| anyhow!("missing update view variant"))?;
+                let mut tasks = Vec::new();
+                this.update(cx, |this, cx| {
                     let project = this.project.clone();
-                    this.update_leader_state(leader_id, cx, |state, cx| {
-                        let variant = variant.clone();
-                        match state
-                            .items_by_leader_view_id
-                            .entry(update_view.id)
-                            .or_insert(FollowerItem::Loading(Vec::new()))
-                        {
-                            FollowerItem::Loaded(item) => {
-                                item.apply_update_proto(&project, variant, cx).log_err();
+                    if let Some(state) = this.follower_states_by_leader.get_mut(&leader_id) {
+                        for state in state.values_mut() {
+                            if let Some(item) = state.items_by_leader_view_id.get(&update_view.id) {
+                                tasks.push(item.apply_update_proto(&project, variant.clone(), cx));
                             }
-                            FollowerItem::Loading(updates) => updates.push(variant),
                         }
-                    });
-                    Ok(())
-                })
+                    }
+                });
+                try_join_all(tasks).await.log_err();
             }
             proto::update_followers::Variant::CreateView(view) => {
-                let panes = this.read_with(&cx, |this, _| {
+                let panes = this.read_with(cx, |this, _| {
                     this.follower_states_by_leader
                         .get(&leader_id)
                         .into_iter()
@@ -2506,13 +2530,10 @@ impl Workspace {
                         .cloned()
                         .collect()
                 });
-                Self::add_views_from_leader(this.clone(), leader_id, panes, vec![view], &mut cx)
-                    .await?;
-                Ok(())
+                Self::add_views_from_leader(this.clone(), leader_id, panes, vec![view], cx).await?;
             }
         }
-        .log_err();
-
+        this.update(cx, |this, cx| this.leader_updated(leader_id, cx));
         Ok(())
     }
 
@@ -2575,29 +2596,12 @@ impl Workspace {
 
                 for (id, item) in leader_view_ids.into_iter().zip(items) {
                     item.set_leader_replica_id(Some(replica_id), cx);
-                    match state.items_by_leader_view_id.entry(id) {
-                        hash_map::Entry::Occupied(e) => {
-                            let e = e.into_mut();
-                            if let FollowerItem::Loading(updates) = e {
-                                for update in updates.drain(..) {
-                                    item.apply_update_proto(&this.project, update, cx)
-                                        .context("failed to apply view update")
-                                        .log_err();
-                                }
-                            }
-                            *e = FollowerItem::Loaded(item);
-                        }
-                        hash_map::Entry::Vacant(e) => {
-                            e.insert(FollowerItem::Loaded(item));
-                        }
-                    }
+                    state.items_by_leader_view_id.insert(id, item);
                 }
 
                 Some(())
             });
         }
-        this.update(cx, |this, cx| this.leader_updated(leader_id, cx));
-
         Ok(())
     }
 
@@ -2631,23 +2635,6 @@ impl Workspace {
             })
     }
 
-    fn update_leader_state(
-        &mut self,
-        leader_id: PeerId,
-        cx: &mut ViewContext<Self>,
-        mut update_fn: impl FnMut(&mut FollowerState, &mut ViewContext<Self>),
-    ) {
-        for (_, state) in self
-            .follower_states_by_leader
-            .get_mut(&leader_id)
-            .into_iter()
-            .flatten()
-        {
-            update_fn(state, cx);
-        }
-        self.leader_updated(leader_id, cx);
-    }
-
     fn leader_updated(&mut self, leader_id: PeerId, cx: &mut ViewContext<Self>) -> Option<()> {
         cx.notify();
 
@@ -2660,7 +2647,7 @@ impl Workspace {
             call::ParticipantLocation::SharedProject { project_id } => {
                 if Some(project_id) == self.project.read(cx).remote_id() {
                     for (pane, state) in self.follower_states_by_leader.get(&leader_id)? {
-                        if let Some(FollowerItem::Loaded(item)) = state
+                        if let Some(item) = state
                             .active_view_id
                             .and_then(|id| state.items_by_leader_view_id.get(&id))
                         {