Avoid infinite loop when collaborators follow each other

Max Brunsfeld and Antonio Scandurra created

Co-Authored-By: Antonio Scandurra <me@as-cii.com>

Change summary

crates/editor/src/items.rs        |   2 
crates/rpc/proto/zed.proto        |  37 +++++----
crates/server/src/rpc.rs          | 124 +++++++++++++++++++++++++++++++-
crates/workspace/src/workspace.rs |  73 ++++++++++++-------
4 files changed, 186 insertions(+), 50 deletions(-)

Detailed changes

crates/editor/src/items.rs 🔗

@@ -6,7 +6,7 @@ use gpui::{
 };
 use language::{Bias, Buffer, Diagnostic, File as _};
 use project::{File, Project, ProjectEntryId, ProjectPath};
-use rpc::proto::{self, update_followers::update_view};
+use rpc::proto::{self, update_view};
 use std::{fmt::Write, path::PathBuf};
 use text::{Point, Selection};
 use util::ResultExt;

crates/rpc/proto/zed.proto 🔗

@@ -556,21 +556,6 @@ message UpdateFollowers {
         View create_view = 4;
         UpdateView update_view = 5;
     }
-
-    message UpdateActiveView {
-        optional uint64 id = 1;
-    }
-
-    message UpdateView {
-        uint64 id = 1;
-        oneof variant {
-            Editor editor = 2;
-        }
-
-        message Editor {
-            Anchor scroll_top = 1;
-        }
-    }
 }
 
 message Unfollow {
@@ -580,10 +565,30 @@ message Unfollow {
 
 // Entities
 
+message UpdateActiveView {
+    optional uint64 id = 1;
+    optional uint32 leader_id = 2;
+}
+
+message UpdateView {
+    uint64 id = 1;
+    optional uint32 leader_id = 2;
+
+    oneof variant {
+        Editor editor = 3;
+    }
+
+    message Editor {
+        Anchor scroll_top = 1;
+    }
+}
+
 message View {
     uint64 id = 1;
+    optional uint32 leader_id = 2;
+
     oneof variant {
-        Editor editor = 2;
+        Editor editor = 3;
     }
 
     message Editor {

crates/server/src/rpc.rs 🔗

@@ -678,17 +678,21 @@ impl Server {
         request: TypedEnvelope<proto::Follow>,
     ) -> tide::Result<proto::FollowResponse> {
         let leader_id = ConnectionId(request.payload.leader_id);
+        let follower_id = request.sender_id;
         if !self
             .state()
-            .project_connection_ids(request.payload.project_id, request.sender_id)?
+            .project_connection_ids(request.payload.project_id, follower_id)?
             .contains(&leader_id)
         {
             Err(anyhow!("no such peer"))?;
         }
-        let response = self
+        let mut response = self
             .peer
             .forward_request(request.sender_id, leader_id, request.payload)
             .await?;
+        response
+            .views
+            .retain(|view| view.leader_id != Some(follower_id.0));
         Ok(response)
     }
 
@@ -716,9 +720,18 @@ impl Server {
         let connection_ids = self
             .state()
             .project_connection_ids(request.payload.project_id, request.sender_id)?;
+        let leader_id = request
+            .payload
+            .variant
+            .as_ref()
+            .and_then(|variant| match variant {
+                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
+                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
+                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
+            });
         for follower_id in &request.payload.follower_ids {
             let follower_id = ConnectionId(*follower_id);
-            if connection_ids.contains(&follower_id) {
+            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
                 self.peer
                     .forward_send(request.sender_id, follower_id, request.payload.clone())?;
             }
@@ -4265,9 +4278,6 @@ mod tests {
 
         // Client B opens an editor.
         let workspace_b = client_b.build_workspace(&project_b, cx_b);
-        workspace_b.update(cx_b, |workspace, cx| {
-            workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
-        });
         let editor_b1 = workspace_b
             .update(cx_b, |workspace, cx| {
                 workspace.open_path((worktree_id, "1.txt"), cx)
@@ -4328,6 +4338,108 @@ mod tests {
         );
     }
 
+    #[gpui::test(iterations = 10)]
+    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
+        cx_a.foreground().forbid_parking();
+        let fs = FakeFs::new(cx_a.background());
+
+        // 2 clients connect to a server.
+        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
+        let mut client_a = server.create_client(cx_a, "user_a").await;
+        let mut client_b = server.create_client(cx_b, "user_b").await;
+        cx_a.update(editor::init);
+        cx_b.update(editor::init);
+
+        // Client A shares a project.
+        fs.insert_tree(
+            "/a",
+            json!({
+                ".zed.toml": r#"collaborators = ["user_b"]"#,
+                "1.txt": "one",
+                "2.txt": "two",
+                "3.txt": "three",
+            }),
+        )
+        .await;
+        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
+        project_a
+            .update(cx_a, |project, cx| project.share(cx))
+            .await
+            .unwrap();
+
+        // Client B joins the project.
+        let project_b = client_b
+            .build_remote_project(
+                project_a
+                    .read_with(cx_a, |project, _| project.remote_id())
+                    .unwrap(),
+                cx_b,
+            )
+            .await;
+
+        // Client A opens some editors.
+        let workspace_a = client_a.build_workspace(&project_a, cx_a);
+        let _editor_a1 = workspace_a
+            .update(cx_a, |workspace, cx| {
+                workspace.open_path((worktree_id, "1.txt"), cx)
+            })
+            .await
+            .unwrap()
+            .downcast::<Editor>()
+            .unwrap();
+
+        // Client B opens an editor.
+        let workspace_b = client_b.build_workspace(&project_b, cx_b);
+        let _editor_b1 = workspace_b
+            .update(cx_b, |workspace, cx| {
+                workspace.open_path((worktree_id, "2.txt"), cx)
+            })
+            .await
+            .unwrap()
+            .downcast::<Editor>()
+            .unwrap();
+
+        // Clients A and B follow each other in split panes
+        workspace_a
+            .update(cx_a, |workspace, cx| {
+                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
+                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
+                workspace
+                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
+                    .unwrap()
+            })
+            .await
+            .unwrap();
+        workspace_b
+            .update(cx_b, |workspace, cx| {
+                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
+                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
+                workspace
+                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
+                    .unwrap()
+            })
+            .await
+            .unwrap();
+
+        workspace_a
+            .update(cx_a, |workspace, cx| {
+                workspace.activate_next_pane(cx);
+                workspace.open_path((worktree_id, "3.txt"), cx)
+            })
+            .await
+            .unwrap();
+
+        // Ensure peers following each other doesn't cause an infinite loop.
+        cx_a.foreground().run_until_parked();
+        assert_eq!(
+            workspace_b.read_with(cx_b, |workspace, cx| workspace
+                .active_item(cx)
+                .unwrap()
+                .project_path(cx)),
+            Some((worktree_id, "3.txt").into())
+        );
+    }
+
     #[gpui::test(iterations = 100)]
     async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
         cx.foreground().forbid_parking();

crates/workspace/src/workspace.rs 🔗

@@ -264,10 +264,10 @@ pub trait FollowableItem: Item {
         &self,
         event: &Self::Event,
         cx: &AppContext,
-    ) -> Option<proto::update_followers::update_view::Variant>;
+    ) -> Option<proto::update_view::Variant>;
     fn apply_update_message(
         &mut self,
-        message: proto::update_followers::update_view::Variant,
+        message: proto::update_view::Variant,
         cx: &mut ViewContext<Self>,
     ) -> Result<()>;
 }
@@ -279,10 +279,10 @@ pub trait FollowableItemHandle: ItemHandle {
         &self,
         event: &dyn Any,
         cx: &AppContext,
-    ) -> Option<proto::update_followers::update_view::Variant>;
+    ) -> Option<proto::update_view::Variant>;
     fn apply_update_message(
         &self,
-        message: proto::update_followers::update_view::Variant,
+        message: proto::update_view::Variant,
         cx: &mut MutableAppContext,
     ) -> Result<()>;
 }
@@ -300,13 +300,13 @@ impl<T: FollowableItem> FollowableItemHandle for ViewHandle<T> {
         &self,
         event: &dyn Any,
         cx: &AppContext,
-    ) -> Option<proto::update_followers::update_view::Variant> {
+    ) -> Option<proto::update_view::Variant> {
         self.read(cx).to_update_message(event.downcast_ref()?, cx)
     }
 
     fn apply_update_message(
         &self,
-        message: proto::update_followers::update_view::Variant,
+        message: proto::update_view::Variant,
         cx: &mut MutableAppContext,
     ) -> Result<()> {
         self.update(cx, |this, cx| this.apply_update_message(message, cx))
@@ -403,6 +403,7 @@ impl<T: Item> ItemHandle for ViewHandle<T> {
                     proto::update_followers::Variant::CreateView(proto::View {
                         id: followed_item.id() as u64,
                         variant: Some(message),
+                        leader_id: workspace.leader_for_pane(&pane).map(|id| id.0),
                     }),
                     cx,
                 );
@@ -441,12 +442,11 @@ impl<T: Item> ItemHandle for ViewHandle<T> {
                 .and_then(|i| i.to_update_message(event, cx))
             {
                 workspace.update_followers(
-                    proto::update_followers::Variant::UpdateView(
-                        proto::update_followers::UpdateView {
-                            id: item.id() as u64,
-                            variant: Some(message),
-                        },
-                    ),
+                    proto::update_followers::Variant::UpdateView(proto::UpdateView {
+                        id: item.id() as u64,
+                        variant: Some(message),
+                        leader_id: workspace.leader_for_pane(&pane).map(|id| id.0),
+                    }),
                     cx,
                 );
             }
@@ -628,7 +628,7 @@ struct FollowerState {
 
 #[derive(Debug)]
 enum FollowerItem {
-    Loading(Vec<proto::update_followers::update_view::Variant>),
+    Loading(Vec<proto::update_view::Variant>),
     Loaded(Box<dyn FollowableItemHandle>),
 }
 
@@ -1110,7 +1110,7 @@ impl Workspace {
 
     fn activate_pane(&mut self, pane: ViewHandle<Pane>, cx: &mut ViewContext<Self>) {
         if self.active_pane != pane {
-            self.active_pane = pane;
+            self.active_pane = pane.clone();
             self.status_bar.update(cx, |status_bar, cx| {
                 status_bar.set_active_pane(&self.active_pane, cx);
             });
@@ -1119,11 +1119,10 @@ impl Workspace {
         }
 
         self.update_followers(
-            proto::update_followers::Variant::UpdateActiveView(
-                proto::update_followers::UpdateActiveView {
-                    id: self.active_item(cx).map(|item| item.id() as u64),
-                },
-            ),
+            proto::update_followers::Variant::UpdateActiveView(proto::UpdateActiveView {
+                id: self.active_item(cx).map(|item| item.id() as u64),
+                leader_id: self.leader_for_pane(&pane).map(|id| id.0),
+            }),
             cx,
         );
     }
@@ -1520,14 +1519,22 @@ impl Workspace {
             Ok(proto::FollowResponse {
                 active_view_id,
                 views: this
-                    .items(cx)
-                    .filter_map(|item| {
-                        let id = item.id() as u64;
-                        let item = item.to_followable_item_handle(cx)?;
-                        let variant = item.to_state_message(cx)?;
-                        Some(proto::View {
-                            id,
-                            variant: Some(variant),
+                    .panes()
+                    .iter()
+                    .flat_map(|pane| {
+                        let leader_id = this.leader_for_pane(pane).map(|id| id.0);
+                        pane.read(cx).items().filter_map({
+                            let cx = &cx;
+                            move |item| {
+                                let id = item.id() as u64;
+                                let item = item.to_followable_item_handle(cx)?;
+                                let variant = item.to_state_message(cx)?;
+                                Some(proto::View {
+                                    id,
+                                    leader_id,
+                                    variant: Some(variant),
+                                })
+                            }
                         })
                     })
                     .collect(),
@@ -1705,6 +1712,18 @@ impl Workspace {
         None
     }
 
+    fn leader_for_pane(&self, pane: &ViewHandle<Pane>) -> Option<PeerId> {
+        self.follower_states_by_leader
+            .iter()
+            .find_map(|(leader_id, state)| {
+                if state.contains_key(pane) {
+                    Some(*leader_id)
+                } else {
+                    None
+                }
+            })
+    }
+
     fn update_leader_state(
         &mut self,
         leader_id: PeerId,