Broadcast active view to followers

Antonio Scandurra created

Change summary

crates/editor/src/items.rs        |   6 
crates/rpc/proto/zed.proto        |  38 +++++---
crates/server/src/rpc.rs          |  36 ++++++++
crates/workspace/src/pane.rs      |   5 
crates/workspace/src/workspace.rs | 145 ++++++++++++++++++++++++++------
5 files changed, 182 insertions(+), 48 deletions(-)

Detailed changes

crates/editor/src/items.rs 🔗

@@ -77,7 +77,7 @@ impl FollowedItem for Editor {
         &self,
         event: &Self::Event,
         cx: &AppContext,
-    ) -> Option<proto::view_update::Variant> {
+    ) -> Option<proto::update_followers::update_view::Variant> {
         match event {
             Event::SelectionsChanged => {
                 let selection = self.newest_anchor_selection();
@@ -88,8 +88,8 @@ impl FollowedItem for Editor {
                     reversed: selection.reversed,
                     goal: Default::default(),
                 };
-                Some(proto::view_update::Variant::Editor(
-                    proto::view_update::Editor {
+                Some(proto::update_followers::update_view::Variant::Editor(
+                    proto::update_followers::update_view::Editor {
                         newest_selection: Some(language::proto::serialize_selection(&selection)),
                     },
                 ))

crates/rpc/proto/zed.proto 🔗

@@ -544,16 +544,33 @@ message Follow {
 }
 
 message FollowResponse {
-    optional uint64 current_view_id = 1;
+    optional uint64 active_view_id = 1;
     repeated View views = 2;
 }
 
 message UpdateFollowers {
     uint64 project_id = 1;
-    uint64 current_view_id = 2;
-    repeated View created_views = 3;
-    repeated ViewUpdate updated_views = 4;
-    repeated uint32 follower_ids = 5;
+    repeated uint32 follower_ids = 2;
+    oneof variant {
+        UpdateActiveView update_active_view = 3;
+        View create_view = 4;
+        UpdateView update_view = 5;
+    }
+
+    message UpdateActiveView {
+        optional uint64 id = 1;
+    }
+
+    message UpdateView {
+        uint64 id = 1;
+        oneof variant {
+            Editor editor = 2;
+        }
+
+        message Editor {
+            Selection newest_selection = 1;
+        }
+    }
 }
 
 message Unfollow {
@@ -575,17 +592,6 @@ message View {
     }
 }
 
-message ViewUpdate {
-    uint64 id = 1;
-    oneof variant {
-        Editor editor = 2;
-    }
-
-    message Editor {
-        Selection newest_selection = 1;
-    }
-}
-
 message Collaborator {
     uint32 peer_id = 1;
     uint32 replica_id = 2;

crates/server/src/rpc.rs 🔗

@@ -114,6 +114,8 @@ impl Server {
             .add_message_handler(Server::leave_channel)
             .add_request_handler(Server::send_channel_message)
             .add_request_handler(Server::follow)
+            .add_message_handler(Server::unfollow)
+            .add_message_handler(Server::update_followers)
             .add_request_handler(Server::get_channel_messages);
 
         Arc::new(server)
@@ -690,6 +692,40 @@ impl Server {
         Ok(response)
     }
 
+    async fn unfollow(
+        self: Arc<Self>,
+        request: TypedEnvelope<proto::Unfollow>,
+    ) -> tide::Result<()> {
+        let leader_id = ConnectionId(request.payload.leader_id);
+        if !self
+            .state()
+            .project_connection_ids(request.payload.project_id, request.sender_id)?
+            .contains(&leader_id)
+        {
+            Err(anyhow!("no such peer"))?;
+        }
+        self.peer
+            .forward_send(request.sender_id, leader_id, request.payload)?;
+        Ok(())
+    }
+
+    async fn update_followers(
+        self: Arc<Self>,
+        request: TypedEnvelope<proto::UpdateFollowers>,
+    ) -> tide::Result<()> {
+        let connection_ids = self
+            .state()
+            .project_connection_ids(request.payload.project_id, request.sender_id)?;
+        for follower_id in &request.payload.follower_ids {
+            let follower_id = ConnectionId(*follower_id);
+            if connection_ids.contains(&follower_id) {
+                self.peer
+                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
+            }
+        }
+        Ok(())
+    }
+
     async fn get_channels(
         self: Arc<Server>,
         request: TypedEnvelope<proto::GetChannels>,

crates/workspace/src/pane.rs 🔗

@@ -321,11 +321,12 @@ impl Pane {
     pub(crate) fn add_item(
         workspace: &mut Workspace,
         pane: ViewHandle<Pane>,
-        mut item: Box<dyn ItemHandle>,
+        item: Box<dyn ItemHandle>,
         cx: &mut ViewContext<Workspace>,
     ) {
         // Prevent adding the same item to the pane more than once.
-        if pane.read(cx).items.iter().any(|i| i.id() == item.id()) {
+        if let Some(item_ix) = pane.read(cx).items.iter().position(|i| i.id() == item.id()) {
+            pane.update(cx, |pane, cx| pane.activate_item(item_ix, cx));
             return;
         }
 

crates/workspace/src/workspace.rs 🔗

@@ -43,6 +43,7 @@ use std::{
     sync::Arc,
 };
 use theme::{Theme, ThemeRegistry};
+use util::ResultExt;
 
 type ProjectItemBuilders = HashMap<
     TypeId,
@@ -118,6 +119,7 @@ pub fn init(client: &Arc<Client>, cx: &mut MutableAppContext) {
 
     client.add_view_request_handler(Workspace::handle_follow);
     client.add_view_message_handler(Workspace::handle_unfollow);
+    client.add_view_message_handler(Workspace::handle_update_followers);
 }
 
 pub fn register_project_item<I: ProjectItem>(cx: &mut MutableAppContext) {
@@ -246,7 +248,7 @@ pub trait FollowedItem: Item {
         &self,
         event: &Self::Event,
         cx: &AppContext,
-    ) -> Option<proto::view_update::Variant>;
+    ) -> Option<proto::update_followers::update_view::Variant>;
 }
 
 pub trait FollowedItemHandle {
@@ -256,7 +258,7 @@ pub trait FollowedItemHandle {
         &self,
         event: &dyn Any,
         cx: &AppContext,
-    ) -> Option<proto::view_update::Variant>;
+    ) -> Option<proto::update_followers::update_view::Variant>;
 }
 
 impl<T: FollowedItem> FollowedItemHandle for ViewHandle<T> {
@@ -272,7 +274,7 @@ impl<T: FollowedItem> FollowedItemHandle for ViewHandle<T> {
         &self,
         event: &dyn Any,
         cx: &AppContext,
-    ) -> Option<proto::view_update::Variant> {
+    ) -> Option<proto::update_followers::update_view::Variant> {
         self.read(cx).to_update_message(event.downcast_ref()?, cx)
     }
 }
@@ -361,6 +363,16 @@ impl<T: Item> ItemHandle for ViewHandle<T> {
         pane: ViewHandle<Pane>,
         cx: &mut ViewContext<Workspace>,
     ) {
+        if let Some(followed_item) = self.to_followed_item_handle(cx) {
+            workspace.update_followers(
+                proto::update_followers::Variant::CreateView(proto::View {
+                    id: followed_item.id() as u64,
+                    variant: Some(followed_item.to_state_message(cx)),
+                }),
+                cx,
+            );
+        }
+
         let pane = pane.downgrade();
         cx.subscribe(self, move |workspace, item, event, cx| {
             let pane = if let Some(pane) = pane.upgrade(cx) {
@@ -391,7 +403,17 @@ impl<T: Item> ItemHandle for ViewHandle<T> {
             if let Some(message) = item
                 .to_followed_item_handle(cx)
                 .and_then(|i| i.to_update_message(event, cx))
-            {}
+            {
+                workspace.update_followers(
+                    proto::update_followers::Variant::UpdateView(
+                        proto::update_followers::UpdateView {
+                            id: item.id() as u64,
+                            variant: Some(message),
+                        },
+                    ),
+                    cx,
+                );
+            }
         })
         .detach();
     }
@@ -553,18 +575,17 @@ pub struct Workspace {
     status_bar: ViewHandle<StatusBar>,
     project: ModelHandle<Project>,
     leader_state: LeaderState,
-    follower_states_by_leader: HashMap<PeerId, FollowerState>,
+    follower_states_by_leader: HashMap<PeerId, HashMap<WeakViewHandle<Pane>, FollowerState>>,
     _observe_current_user: Task<()>,
 }
 
 #[derive(Default)]
 struct LeaderState {
     followers: HashSet<PeerId>,
-    subscriptions: Vec<Subscription>,
 }
 
 struct FollowerState {
-    current_view_id: Option<usize>,
+    active_view_id: Option<usize>,
     items_by_leader_view_id: HashMap<usize, Box<dyn ItemHandle>>,
 }
 
@@ -1053,6 +1074,15 @@ impl Workspace {
             cx.focus(&self.active_pane);
             cx.notify();
         }
+
+        self.update_followers(
+            proto::update_followers::Variant::UpdateActiveView(
+                proto::update_followers::UpdateActiveView {
+                    id: self.active_item(cx).map(|item| item.id() as u64),
+                },
+            ),
+            cx,
+        );
     }
 
     fn handle_pane_event(
@@ -1179,7 +1209,7 @@ impl Workspace {
 
                     let items = futures::future::try_join_all(item_tasks).await?;
                     let follower_state = FollowerState {
-                        current_view_id: response.current_view_id.map(|id| id as usize),
+                        active_view_id: response.active_view_id.map(|id| id as usize),
                         items_by_leader_view_id: response
                             .views
                             .iter()
@@ -1187,22 +1217,12 @@ impl Workspace {
                             .zip(items)
                             .collect(),
                     };
-                    let current_item = if let Some(current_view_id) = follower_state.current_view_id
-                    {
-                        Some(
-                            follower_state
-                                .items_by_leader_view_id
-                                .get(&current_view_id)
-                                .ok_or_else(|| anyhow!("invalid current view id"))?
-                                .clone(),
-                        )
-                    } else {
-                        None
-                    };
                     this.update(&mut cx, |this, cx| {
-                        if let Some(item) = current_item {
-                            Pane::add_item(this, pane, item, cx);
-                        }
+                        this.follower_states_by_leader
+                            .entry(leader_id)
+                            .or_default()
+                            .insert(pane.downgrade(), follower_state);
+                        this.leader_updated(leader_id, cx);
                     });
                 }
                 Ok(())
@@ -1212,6 +1232,24 @@ impl Workspace {
         }
     }
 
+    fn update_followers(
+        &self,
+        update: proto::update_followers::Variant,
+        cx: &AppContext,
+    ) -> Option<()> {
+        let project_id = self.project.read(cx).remote_id()?;
+        if !self.leader_state.followers.is_empty() {
+            self.client
+                .send(proto::UpdateFollowers {
+                    project_id,
+                    follower_ids: self.leader_state.followers.iter().map(|f| f.0).collect(),
+                    variant: Some(update),
+                })
+                .log_err();
+        }
+        None
+    }
+
     fn render_connection_status(&self, cx: &mut RenderContext<Self>) -> Option<ElementBox> {
         let theme = &cx.global::<Settings>().theme;
         match &*self.client.status().borrow() {
@@ -1432,12 +1470,12 @@ impl Workspace {
                 .followers
                 .insert(envelope.original_sender_id()?);
 
-            let current_view_id = this
+            let active_view_id = this
                 .active_item(cx)
                 .and_then(|i| i.to_followed_item_handle(cx))
                 .map(|i| i.id() as u64);
             Ok(proto::FollowResponse {
-                current_view_id,
+                active_view_id,
                 views: this
                     .items(cx)
                     .filter_map(|item| {
@@ -1458,9 +1496,62 @@ impl Workspace {
         this: ViewHandle<Self>,
         envelope: TypedEnvelope<proto::Unfollow>,
         _: Arc<Client>,
-        cx: AsyncAppContext,
+        mut cx: AsyncAppContext,
     ) -> Result<()> {
-        Ok(())
+        this.update(&mut cx, |this, cx| {
+            this.leader_state
+                .followers
+                .remove(&envelope.original_sender_id()?);
+            Ok(())
+        })
+    }
+
+    async fn handle_update_followers(
+        this: ViewHandle<Self>,
+        envelope: TypedEnvelope<proto::UpdateFollowers>,
+        _: Arc<Client>,
+        mut cx: AsyncAppContext,
+    ) -> Result<()> {
+        this.update(&mut cx, |this, cx| {
+            let leader_id = envelope.original_sender_id()?;
+            let follower_states = this
+                .follower_states_by_leader
+                .get_mut(&leader_id)
+                .ok_or_else(|| anyhow!("received follow update for an unfollowed peer"))?;
+            match envelope
+                .payload
+                .variant
+                .ok_or_else(|| anyhow!("invalid update"))?
+            {
+                proto::update_followers::Variant::UpdateActiveView(update_active_view) => {
+                    for (pane, state) in follower_states {
+                        state.active_view_id = update_active_view.id.map(|id| id as usize);
+                    }
+                }
+                proto::update_followers::Variant::CreateView(_) => todo!(),
+                proto::update_followers::Variant::UpdateView(_) => todo!(),
+            }
+
+            this.leader_updated(leader_id, cx);
+            Ok(())
+        })
+    }
+
+    fn leader_updated(&mut self, leader_id: PeerId, cx: &mut ViewContext<Self>) -> Option<()> {
+        let mut items_to_add = Vec::new();
+        for (pane, state) in self.follower_states_by_leader.get(&leader_id)? {
+            if let Some((pane, active_view_id)) = pane.upgrade(cx).zip(state.active_view_id) {
+                if let Some(item) = state.items_by_leader_view_id.get(&active_view_id) {
+                    items_to_add.push((pane, item.clone()));
+                }
+            }
+        }
+
+        for (pane, item) in items_to_add {
+            Pane::add_item(self, pane, item.clone(), cx);
+        }
+
+        None
     }
 }