Follower simplification (#8026)

Conrad Irwin created

Release Notes:

- Improved reliability of following

Change summary

crates/collab/src/rpc.rs          |  17 +-
crates/rpc/proto/zed.proto        |   8 +
crates/workspace/src/workspace.rs | 194 ++++++++++++++++++++++----------
3 files changed, 143 insertions(+), 76 deletions(-)

Detailed changes

crates/collab/src/rpc.rs 🔗

@@ -2088,21 +2088,16 @@ async fn update_followers(request: proto::UpdateFollowers, session: Session) ->
     };
 
     // For now, don't send view update messages back to that view's current leader.
-    let connection_id_to_omit = request.variant.as_ref().and_then(|variant| match variant {
+    let peer_id_to_omit = request.variant.as_ref().and_then(|variant| match variant {
         proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
         _ => None,
     });
 
-    for follower_peer_id in request.follower_ids.iter().copied() {
-        let follower_connection_id = follower_peer_id.into();
-        if Some(follower_peer_id) != connection_id_to_omit
-            && connection_ids.contains(&follower_connection_id)
-        {
-            session.peer.forward_send(
-                session.connection_id,
-                follower_connection_id,
-                request.clone(),
-            )?;
+    for connection_id in connection_ids.iter().cloned() {
+        if Some(connection_id.into()) != peer_id_to_omit && connection_id != session.connection_id {
+            session
+                .peer
+                .forward_send(session.connection_id, connection_id, request.clone())?;
         }
     }
     Ok(())

crates/rpc/proto/zed.proto 🔗

@@ -1290,6 +1290,8 @@ message Follow {
 }
 
 message FollowResponse {
+    View active_view = 3;
+    // TODO: after 0.124.0 is retired, remove these.
     optional ViewId active_view_id = 1;
     repeated View views = 2;
 }
@@ -1297,10 +1299,11 @@ message FollowResponse {
 message UpdateFollowers {
     uint64 room_id = 1;
     optional uint64 project_id = 2;
-    repeated PeerId follower_ids = 3;
+    reserved 3;
     oneof variant {
-        UpdateActiveView update_active_view = 4;
         View create_view = 5;
+        // TODO: after 0.124.0 is retired, remove these.
+        UpdateActiveView update_active_view = 4;
         UpdateView update_view = 6;
     }
 }
@@ -1329,6 +1332,7 @@ message ViewId {
 message UpdateActiveView {
     optional ViewId id = 1;
     optional PeerId leader_id = 2;
+    View view = 3;
 }
 
 message UpdateView {

crates/workspace/src/workspace.rs 🔗

@@ -358,7 +358,6 @@ impl Global for GlobalAppState {}
 
 pub struct WorkspaceStore {
     workspaces: HashSet<WindowHandle<Workspace>>,
-    followers: Vec<Follower>,
     client: Arc<Client>,
     _subscriptions: Vec<client::Subscription>,
 }
@@ -2509,6 +2508,10 @@ impl Workspace {
                 };
                 Ok::<_, anyhow::Error>(())
             })??;
+            if let Some(view) = response.active_view {
+                Self::add_view_from_leader(this.clone(), leader_id, pane.clone(), &view, &mut cx)
+                    .await?;
+            }
             Self::add_views_from_leader(
                 this.clone(),
                 leader_id,
@@ -2726,6 +2729,23 @@ impl Workspace {
 
     // RPC handlers
 
+    fn active_view_for_follower(&self, cx: &mut ViewContext<Self>) -> Option<proto::View> {
+        let item = self.active_item(cx)?;
+        let leader_id = self
+            .pane_for(&*item)
+            .and_then(|pane| self.leader_for_pane(&pane));
+
+        let item_handle = item.to_followable_item_handle(cx)?;
+        let id = item_handle.remote_id(&self.app_state.client, cx)?;
+        let variant = item_handle.to_state_proto(cx)?;
+
+        Some(proto::View {
+            id: Some(id.to_proto()),
+            leader_id,
+            variant: Some(variant),
+        })
+    }
+
     fn handle_follow(
         &mut self,
         follower_project_id: Option<u64>,
@@ -2734,17 +2754,14 @@ impl Workspace {
         let client = &self.app_state.client;
         let project_id = self.project.read(cx).remote_id();
 
-        let active_view_id = self.active_item(cx).and_then(|i| {
-            Some(
-                i.to_followable_item_handle(cx)?
-                    .remote_id(client, cx)?
-                    .to_proto(),
-            )
-        });
+        let active_view = self.active_view_for_follower(cx);
+        let active_view_id = active_view.as_ref().and_then(|view| view.id.clone());
 
         cx.notify();
 
         proto::FollowResponse {
+            active_view,
+            // TODO: once v0.124.0 is retired we can stop sending these
             active_view_id,
             views: self
                 .panes()
@@ -2802,19 +2819,35 @@ impl Workspace {
     ) -> Result<()> {
         match update.variant.ok_or_else(|| anyhow!("invalid update"))? {
             proto::update_followers::Variant::UpdateActiveView(update_active_view) => {
-                this.update(cx, |this, _| {
-                    for (_, state) in &mut this.follower_states {
-                        if state.leader_id == leader_id {
-                            state.active_view_id =
-                                if let Some(active_view_id) = update_active_view.id.clone() {
-                                    Some(ViewId::from_proto(active_view_id)?)
-                                } else {
-                                    None
-                                };
+                let panes_missing_view = this.update(cx, |this, _| {
+                    let mut panes = vec![];
+                    for (pane, state) in &mut this.follower_states {
+                        if state.leader_id != leader_id {
+                            continue;
+                        }
+
+                        state.active_view_id =
+                            if let Some(active_view_id) = update_active_view.id.clone() {
+                                Some(ViewId::from_proto(active_view_id)?)
+                            } else {
+                                None
+                            };
+
+                        if state.active_view_id.is_some_and(|view_id| {
+                            !state.items_by_leader_view_id.contains_key(&view_id)
+                        }) {
+                            panes.push(pane.clone())
                         }
                     }
-                    anyhow::Ok(())
+                    anyhow::Ok(panes)
                 })??;
+
+                if let Some(view) = update_active_view.view {
+                    for pane in panes_missing_view {
+                        Self::add_view_from_leader(this.clone(), leader_id, pane.clone(), &view, cx)
+                            .await?
+                    }
+                }
             }
             proto::update_followers::Variant::UpdateView(update_view) => {
                 let variant = update_view
@@ -2853,6 +2886,56 @@ impl Workspace {
         Ok(())
     }
 
+    async fn add_view_from_leader(
+        this: WeakView<Self>,
+        leader_id: PeerId,
+        pane: View<Pane>,
+        view: &proto::View,
+        cx: &mut AsyncWindowContext,
+    ) -> Result<()> {
+        let this = this.upgrade().context("workspace dropped")?;
+
+        let item_builders = cx.update(|cx| {
+            cx.default_global::<FollowableItemBuilders>()
+                .values()
+                .map(|b| b.0)
+                .collect::<Vec<_>>()
+        })?;
+
+        let Some(id) = view.id.clone() else {
+            return Err(anyhow!("no id for view")).into();
+        };
+        let id = ViewId::from_proto(id)?;
+
+        let mut variant = view.variant.clone();
+        if variant.is_none() {
+            Err(anyhow!("missing view variant"))?;
+        }
+
+        let task = item_builders.iter().find_map(|build_item| {
+            cx.update(|cx| build_item(pane.clone(), this.clone(), id, &mut variant, cx))
+                .log_err()
+                .flatten()
+        });
+        let Some(task) = task else {
+            return Err(anyhow!(
+                "failed to construct view from leader (maybe from a different version of zed?)"
+            ));
+        };
+
+        let item = task.await?;
+
+        this.update(cx, |this, cx| {
+            let state = this.follower_states.get_mut(&pane)?;
+            item.set_leader_peer_id(Some(leader_id), cx);
+            state.items_by_leader_view_id.insert(id, item);
+
+            Some(())
+        })?;
+
+        Ok(())
+    }
+
     async fn add_views_from_leader(
         this: WeakView<Self>,
         leader_id: PeerId,
@@ -2920,13 +3003,31 @@ impl Workspace {
         if cx.is_window_active() {
             if let Some(item) = self.active_item(cx) {
                 if item.focus_handle(cx).contains_focused(cx) {
+                    let leader_id = self
+                        .pane_for(&*item)
+                        .and_then(|pane| self.leader_for_pane(&pane));
+
                     if let Some(item) = item.to_followable_item_handle(cx) {
-                        is_project_item = item.is_project_item(cx);
-                        update = proto::UpdateActiveView {
-                            id: item
-                                .remote_id(&self.app_state.client, cx)
-                                .map(|id| id.to_proto()),
-                            leader_id: self.leader_for_pane(&self.active_pane),
+                        let id = item
+                            .remote_id(&self.app_state.client, cx)
+                            .map(|id| id.to_proto());
+
+                        if let Some(id) = id.clone() {
+                            if let Some(variant) = item.to_state_proto(cx) {
+                                let view = Some(proto::View {
+                                    id: Some(id.clone()),
+                                    leader_id,
+                                    variant: Some(variant),
+                                });
+
+                                is_project_item = item.is_project_item(cx);
+                                update = proto::UpdateActiveView {
+                                    view,
+                                    // TODO: once v0.124.0 is retired we can stop sending these
+                                    id: Some(id),
+                                    leader_id,
+                                };
+                            }
                         };
                     }
                 }
@@ -3789,10 +3890,8 @@ impl WorkspaceStore {
     pub fn new(client: Arc<Client>, cx: &mut ModelContext<Self>) -> Self {
         Self {
             workspaces: Default::default(),
-            followers: Default::default(),
             _subscriptions: vec![
                 client.add_request_handler(cx.weak_model(), Self::handle_follow),
-                client.add_message_handler(cx.weak_model(), Self::handle_unfollow),
                 client.add_message_handler(cx.weak_model(), Self::handle_update_followers),
             ],
             client,
@@ -3807,25 +3906,10 @@ impl WorkspaceStore {
     ) -> Option<()> {
         let active_call = ActiveCall::try_global(cx)?;
         let room_id = active_call.read(cx).room()?.read(cx).id();
-        let follower_ids: Vec<_> = self
-            .followers
-            .iter()
-            .filter_map(|follower| {
-                if follower.project_id == project_id || project_id.is_none() {
-                    Some(follower.peer_id.into())
-                } else {
-                    None
-                }
-            })
-            .collect();
-        if follower_ids.is_empty() {
-            return None;
-        }
         self.client
             .send(proto::UpdateFollowers {
                 room_id,
                 project_id,
-                follower_ids,
                 variant: Some(update),
             })
             .log_err()
@@ -3862,36 +3946,20 @@ impl WorkspaceStore {
                                 response.active_view_id = Some(active_view_id);
                             }
                         }
+
+                        if let Some(active_view) = handler_response.active_view.clone() {
+                            if workspace.project.read(cx).remote_id() == follower.project_id {
+                                response.active_view = Some(active_view)
+                            }
+                        }
                     })
                     .is_ok()
             });
 
-            if let Err(ix) = this.followers.binary_search(&follower) {
-                this.followers.insert(ix, follower);
-            }
-
             Ok(response)
         })?
     }
 
-    async fn handle_unfollow(
-        model: Model<Self>,
-        envelope: TypedEnvelope<proto::Unfollow>,
-        _: Arc<Client>,
-        mut cx: AsyncAppContext,
-    ) -> Result<()> {
-        model.update(&mut cx, |this, _| {
-            let follower = Follower {
-                project_id: envelope.payload.project_id,
-                peer_id: envelope.original_sender_id()?,
-            };
-            if let Ok(ix) = this.followers.binary_search(&follower) {
-                this.followers.remove(ix);
-            }
-            Ok(())
-        })?
-    }
-
     async fn handle_update_followers(
         this: Model<Self>,
         envelope: TypedEnvelope<proto::UpdateFollowers>,