Introduce a WorkspaceStore for handling following

Max Brunsfeld created

Change summary

crates/call/src/call.rs                | 205 +-------------------------
crates/collab/src/tests/test_server.rs |   4 
crates/workspace/src/workspace.rs      | 212 ++++++++++++++++++++++++---
crates/zed/src/main.rs                 |   4 
4 files changed, 201 insertions(+), 224 deletions(-)

Detailed changes

crates/call/src/call.rs 🔗

@@ -2,29 +2,23 @@ pub mod call_settings;
 pub mod participant;
 pub mod room;
 
-use std::sync::Arc;
-
 use anyhow::{anyhow, Result};
 use audio::Audio;
 use call_settings::CallSettings;
 use channel::ChannelId;
-use client::{
-    proto::{self, PeerId},
-    ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore,
-};
+use client::{proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore};
 use collections::HashSet;
 use futures::{future::Shared, FutureExt};
-use postage::watch;
-
 use gpui::{
-    AnyViewHandle, AnyWeakViewHandle, AppContext, AsyncAppContext, Entity, ModelContext,
-    ModelHandle, Subscription, Task, ViewContext, WeakModelHandle,
+    AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task,
+    WeakModelHandle,
 };
+use postage::watch;
 use project::Project;
+use std::sync::Arc;
 
 pub use participant::ParticipantLocation;
 pub use room::Room;
-use util::ResultExt;
 
 pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
     settings::register::<CallSettings>(cx);
@@ -53,25 +47,9 @@ pub struct ActiveCall {
     ),
     client: Arc<Client>,
     user_store: ModelHandle<UserStore>,
-    follow_handlers: Vec<FollowHandler>,
-    followers: Vec<Follower>,
     _subscriptions: Vec<client::Subscription>,
 }
 
-#[derive(PartialEq, Eq, PartialOrd, Ord, Debug)]
-struct Follower {
-    project_id: Option<u64>,
-    peer_id: PeerId,
-}
-
-struct FollowHandler {
-    project_id: Option<u64>,
-    root_view: AnyWeakViewHandle,
-    get_views:
-        Box<dyn Fn(&AnyViewHandle, Option<u64>, &mut AppContext) -> Option<proto::FollowResponse>>,
-    update_view: Box<dyn Fn(&AnyViewHandle, PeerId, proto::UpdateFollowers, &mut AppContext)>,
-}
-
 impl Entity for ActiveCall {
     type Event = room::Event;
 }
@@ -88,14 +66,10 @@ impl ActiveCall {
             location: None,
             pending_invites: Default::default(),
             incoming_call: watch::channel(),
-            follow_handlers: Default::default(),
-            followers: Default::default(),
+
             _subscriptions: vec![
                 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
                 client.add_message_handler(cx.handle(), Self::handle_call_canceled),
-                client.add_request_handler(cx.handle(), Self::handle_follow),
-                client.add_message_handler(cx.handle(), Self::handle_unfollow),
-                client.add_message_handler(cx.handle(), Self::handle_update_from_leader),
             ],
             client,
             user_store,
@@ -106,48 +80,6 @@ impl ActiveCall {
         self.room()?.read(cx).channel_id()
     }
 
-    pub fn add_follow_handler<V: gpui::View, GetViews, UpdateView>(
-        &mut self,
-        root_view: gpui::ViewHandle<V>,
-        project_id: Option<u64>,
-        get_views: GetViews,
-        update_view: UpdateView,
-        _cx: &mut ModelContext<Self>,
-    ) where
-        GetViews: 'static
-            + Fn(&mut V, Option<u64>, &mut gpui::ViewContext<V>) -> Result<proto::FollowResponse>,
-        UpdateView:
-            'static + Fn(&mut V, PeerId, proto::UpdateFollowers, &mut ViewContext<V>) -> Result<()>,
-    {
-        self.follow_handlers
-            .retain(|h| h.root_view.id() != root_view.id());
-        if let Err(ix) = self
-            .follow_handlers
-            .binary_search_by_key(&(project_id, root_view.id()), |f| {
-                (f.project_id, f.root_view.id())
-            })
-        {
-            self.follow_handlers.insert(
-                ix,
-                FollowHandler {
-                    project_id,
-                    root_view: root_view.into_any().downgrade(),
-                    get_views: Box::new(move |view, project_id, cx| {
-                        let view = view.clone().downcast::<V>().unwrap();
-                        view.update(cx, |view, cx| get_views(view, project_id, cx).log_err())
-                            .flatten()
-                    }),
-                    update_view: Box::new(move |view, leader_id, message, cx| {
-                        let view = view.clone().downcast::<V>().unwrap();
-                        view.update(cx, |view, cx| {
-                            update_view(view, leader_id, message, cx).log_err()
-                        });
-                    }),
-                },
-            );
-        }
-    }
-
     async fn handle_incoming_call(
         this: ModelHandle<Self>,
         envelope: TypedEnvelope<proto::IncomingCall>,
@@ -194,127 +126,6 @@ impl ActiveCall {
         Ok(())
     }
 
-    async fn handle_follow(
-        this: ModelHandle<Self>,
-        envelope: TypedEnvelope<proto::Follow>,
-        _: Arc<Client>,
-        mut cx: AsyncAppContext,
-    ) -> Result<proto::FollowResponse> {
-        this.update(&mut cx, |this, cx| {
-            let follower = Follower {
-                project_id: envelope.payload.project_id,
-                peer_id: envelope.original_sender_id()?,
-            };
-            let active_project_id = this
-                .location
-                .as_ref()
-                .and_then(|project| project.upgrade(cx)?.read(cx).remote_id());
-
-            let mut response = proto::FollowResponse::default();
-            for handler in &this.follow_handlers {
-                if follower.project_id != handler.project_id && follower.project_id.is_some() {
-                    continue;
-                }
-
-                let Some(root_view) = handler.root_view.upgrade(cx) else {
-                    continue;
-                };
-
-                let Some(handler_response) =
-                    (handler.get_views)(&root_view, follower.project_id, cx)
-                else {
-                    continue;
-                };
-
-                if response.views.is_empty() {
-                    response.views = handler_response.views;
-                } else {
-                    response.views.extend_from_slice(&handler_response.views);
-                }
-
-                if let Some(active_view_id) = handler_response.active_view_id.clone() {
-                    if response.active_view_id.is_none() || handler.project_id == active_project_id
-                    {
-                        response.active_view_id = Some(active_view_id);
-                    }
-                }
-            }
-
-            if let Err(ix) = this.followers.binary_search(&follower) {
-                this.followers.insert(ix, follower);
-            }
-
-            Ok(response)
-        })
-    }
-
-    async fn handle_unfollow(
-        this: ModelHandle<Self>,
-        envelope: TypedEnvelope<proto::Unfollow>,
-        _: Arc<Client>,
-        mut cx: AsyncAppContext,
-    ) -> Result<()> {
-        this.update(&mut cx, |this, _| {
-            let follower = Follower {
-                project_id: envelope.payload.project_id,
-                peer_id: envelope.original_sender_id()?,
-            };
-            if let Ok(ix) = this.followers.binary_search(&follower) {
-                this.followers.remove(ix);
-            }
-            Ok(())
-        })
-    }
-
-    async fn handle_update_from_leader(
-        this: ModelHandle<Self>,
-        envelope: TypedEnvelope<proto::UpdateFollowers>,
-        _: Arc<Client>,
-        mut cx: AsyncAppContext,
-    ) -> Result<()> {
-        let leader_id = envelope.original_sender_id()?;
-        let update = envelope.payload;
-        this.update(&mut cx, |this, cx| {
-            for handler in &this.follow_handlers {
-                if update.project_id != handler.project_id && update.project_id.is_some() {
-                    continue;
-                }
-                let Some(root_view) = handler.root_view.upgrade(cx) else {
-                    continue;
-                };
-                (handler.update_view)(&root_view, leader_id, update.clone(), cx);
-            }
-            Ok(())
-        })
-    }
-
-    pub fn update_followers(
-        &self,
-        project_id: Option<u64>,
-        update: proto::update_followers::Variant,
-        cx: &AppContext,
-    ) -> Option<()> {
-        let room_id = self.room()?.read(cx).id();
-        let follower_ids: Vec<_> = self
-            .followers
-            .iter()
-            .filter_map(|follower| {
-                (follower.project_id == project_id).then_some(follower.peer_id.into())
-            })
-            .collect();
-        if follower_ids.is_empty() {
-            return None;
-        }
-        self.client
-            .send(proto::UpdateFollowers {
-                room_id,
-                project_id,
-                follower_ids,
-                variant: Some(update),
-            })
-            .log_err()
-    }
-
     pub fn global(cx: &AppContext) -> ModelHandle<Self> {
         cx.global::<ModelHandle<Self>>().clone()
     }
@@ -536,6 +347,10 @@ impl ActiveCall {
         }
     }
 
+    pub fn location(&self) -> Option<&WeakModelHandle<Project>> {
+        self.location.as_ref()
+    }
+
     pub fn set_location(
         &mut self,
         project: Option<&ModelHandle<Project>>,

crates/collab/src/tests/test_server.rs 🔗

@@ -29,7 +29,7 @@ use std::{
     },
 };
 use util::http::FakeHttpClient;
-use workspace::Workspace;
+use workspace::{Workspace, WorkspaceStore};
 
 pub struct TestServer {
     pub app_state: Arc<AppState>,
@@ -204,6 +204,7 @@ impl TestServer {
 
         let fs = FakeFs::new(cx.background());
         let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
+        let workspace_store = cx.add_model(|cx| WorkspaceStore::new(client.clone(), cx));
         let channel_store =
             cx.add_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
         let mut language_registry = LanguageRegistry::test();
@@ -211,6 +212,7 @@ impl TestServer {
         let app_state = Arc::new(workspace::AppState {
             client: client.clone(),
             user_store: user_store.clone(),
+            workspace_store,
             channel_store: channel_store.clone(),
             languages: Arc::new(language_registry),
             fs: fs.clone(),

crates/workspace/src/workspace.rs 🔗

@@ -15,7 +15,7 @@ use call::ActiveCall;
 use channel::ChannelStore;
 use client::{
     proto::{self, PeerId},
-    Client, UserStore,
+    Client, TypedEnvelope, UserStore,
 };
 use collections::{hash_map, HashMap, HashSet};
 use drag_and_drop::DragAndDrop;
@@ -451,6 +451,7 @@ pub struct AppState {
     pub client: Arc<Client>,
     pub user_store: ModelHandle<UserStore>,
     pub channel_store: ModelHandle<ChannelStore>,
+    pub workspace_store: ModelHandle<WorkspaceStore>,
     pub fs: Arc<dyn fs::Fs>,
     pub build_window_options:
         fn(Option<WindowBounds>, Option<uuid::Uuid>, &dyn Platform) -> WindowOptions<'static>,
@@ -459,6 +460,19 @@ pub struct AppState {
     pub background_actions: BackgroundActions,
 }
 
+pub struct WorkspaceStore {
+    workspaces: HashSet<WeakViewHandle<Workspace>>,
+    followers: Vec<Follower>,
+    client: Arc<Client>,
+    _subscriptions: Vec<client::Subscription>,
+}
+
+#[derive(PartialEq, Eq, PartialOrd, Ord, Debug)]
+struct Follower {
+    project_id: Option<u64>,
+    peer_id: PeerId,
+}
+
 impl AppState {
     #[cfg(any(test, feature = "test-support"))]
     pub fn test(cx: &mut AppContext) -> Arc<Self> {
@@ -475,6 +489,7 @@ impl AppState {
         let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
         let channel_store =
             cx.add_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
+        let workspace_store = cx.add_model(|cx| WorkspaceStore::new(client.clone(), cx));
 
         theme::init((), cx);
         client::init(&client, cx);
@@ -486,6 +501,7 @@ impl AppState {
             languages,
             user_store,
             channel_store,
+            workspace_store,
             initialize_workspace: |_, _, _, _| Task::ready(Ok(())),
             build_window_options: |_, _, _| Default::default(),
             background_actions: || &[],
@@ -663,6 +679,10 @@ impl Workspace {
         cx.focus(&center_pane);
         cx.emit(Event::PaneAdded(center_pane.clone()));
 
+        app_state.workspace_store.update(cx, |store, _| {
+            store.workspaces.insert(weak_handle.clone());
+        });
+
         let mut current_user = app_state.user_store.read(cx).watch_current_user();
         let mut connection_status = app_state.client.status();
         let _observe_current_user = cx.spawn(|this, mut cx| async move {
@@ -2492,19 +2512,8 @@ impl Workspace {
         &self.active_pane
     }
 
-    fn project_remote_id_changed(&mut self, remote_id: Option<u64>, cx: &mut ViewContext<Self>) {
-        let handle = cx.handle();
-        if let Some(call) = self.active_call() {
-            call.update(cx, |call, cx| {
-                call.add_follow_handler(
-                    handle,
-                    remote_id,
-                    Self::get_views_for_followers,
-                    Self::handle_update_followers,
-                    cx,
-                );
-            });
-        }
+    fn project_remote_id_changed(&mut self, _project_id: Option<u64>, _cx: &mut ViewContext<Self>) {
+        // TODO
     }
 
     fn collaborator_left(&mut self, peer_id: PeerId, cx: &mut ViewContext<Self>) {
@@ -2793,11 +2802,7 @@ impl Workspace {
 
     // RPC handlers
 
-    fn get_views_for_followers(
-        &mut self,
-        _project_id: Option<u64>,
-        cx: &mut ViewContext<Self>,
-    ) -> Result<proto::FollowResponse> {
+    fn handle_follow(&mut self, cx: &mut ViewContext<Self>) -> proto::FollowResponse {
         let client = &self.app_state.client;
 
         let active_view_id = self.active_item(cx).and_then(|i| {
@@ -2810,7 +2815,7 @@ impl Workspace {
 
         cx.notify();
 
-        Ok(proto::FollowResponse {
+        proto::FollowResponse {
             active_view_id,
             views: self
                 .panes()
@@ -2832,7 +2837,7 @@ impl Workspace {
                     })
                 })
                 .collect(),
-        })
+        }
     }
 
     fn handle_update_followers(
@@ -2840,10 +2845,10 @@ impl Workspace {
         leader_id: PeerId,
         message: proto::UpdateFollowers,
         _cx: &mut ViewContext<Self>,
-    ) -> Result<()> {
+    ) {
         self.leader_updates_tx
-            .unbounded_send((leader_id, message))?;
-        Ok(())
+            .unbounded_send((leader_id, message))
+            .ok();
     }
 
     async fn process_leader_update(
@@ -2999,9 +3004,9 @@ impl Workspace {
         update: proto::update_followers::Variant,
         cx: &AppContext,
     ) -> Option<()> {
-        self.active_call()?
-            .read(cx)
-            .update_followers(self.project.read(cx).remote_id(), update, cx)
+        self.app_state().workspace_store.read_with(cx, |store, cx| {
+            store.update_followers(self.project.read(cx).remote_id(), update, cx)
+        })
     }
 
     pub fn leader_for_pane(&self, pane: &ViewHandle<Pane>) -> Option<PeerId> {
@@ -3472,8 +3477,10 @@ impl Workspace {
 
         let channel_store =
             cx.add_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
+        let workspace_store = cx.add_model(|cx| WorkspaceStore::new(client.clone(), cx));
         let app_state = Arc::new(AppState {
             languages: project.read(cx).languages().clone(),
+            workspace_store,
             client,
             user_store,
             channel_store,
@@ -3717,6 +3724,12 @@ fn notify_if_database_failed(workspace: &WeakViewHandle<Workspace>, cx: &mut Asy
 
 impl Entity for Workspace {
     type Event = Event;
+
+    fn release(&mut self, cx: &mut AppContext) {
+        self.app_state.workspace_store.update(cx, |store, _| {
+            store.workspaces.remove(&self.weak_self);
+        })
+    }
 }
 
 impl View for Workspace {
@@ -3859,6 +3872,151 @@ impl View for Workspace {
     }
 }
 
+impl WorkspaceStore {
+    pub fn new(client: Arc<Client>, cx: &mut ModelContext<Self>) -> Self {
+        Self {
+            workspaces: Default::default(),
+            followers: Default::default(),
+            _subscriptions: vec![
+                client.add_request_handler(cx.handle(), Self::handle_follow),
+                client.add_message_handler(cx.handle(), Self::handle_unfollow),
+                client.add_message_handler(cx.handle(), Self::handle_update_from_leader),
+            ],
+            client,
+        }
+    }
+
+    pub fn update_followers(
+        &self,
+        project_id: Option<u64>,
+        update: proto::update_followers::Variant,
+        cx: &AppContext,
+    ) -> Option<()> {
+        if !cx.has_global::<ModelHandle<ActiveCall>>() {
+            return None;
+        }
+
+        let room_id = ActiveCall::global(cx).read(cx).room()?.read(cx).id();
+        let follower_ids: Vec<_> = self
+            .followers
+            .iter()
+            .filter_map(|follower| {
+                (follower.project_id == project_id).then_some(follower.peer_id.into())
+            })
+            .collect();
+        if follower_ids.is_empty() {
+            return None;
+        }
+        self.client
+            .send(proto::UpdateFollowers {
+                room_id,
+                project_id,
+                follower_ids,
+                variant: Some(update),
+            })
+            .log_err()
+    }
+
+    async fn handle_follow(
+        this: ModelHandle<Self>,
+        envelope: TypedEnvelope<proto::Follow>,
+        _: Arc<Client>,
+        mut cx: AsyncAppContext,
+    ) -> Result<proto::FollowResponse> {
+        this.update(&mut cx, |this, cx| {
+            let follower = Follower {
+                project_id: envelope.payload.project_id,
+                peer_id: envelope.original_sender_id()?,
+            };
+            let active_project_id = ActiveCall::global(cx)
+                .read(cx)
+                .location()
+                .as_ref()
+                .and_then(|project| project.upgrade(cx)?.read(cx).remote_id());
+
+            let mut response = proto::FollowResponse::default();
+            for workspace in &this.workspaces {
+                let Some(workspace) = workspace.upgrade(cx) else {
+                    continue;
+                };
+
+                workspace.update(cx.as_mut(), |workspace, cx| {
+                    let project_id = workspace.project.read(cx).remote_id();
+                    if follower.project_id != project_id && follower.project_id.is_some() {
+                        return;
+                    }
+
+                    let handler_response = workspace.handle_follow(cx);
+                    if response.views.is_empty() {
+                        response.views = handler_response.views;
+                    } else {
+                        response.views.extend_from_slice(&handler_response.views);
+                    }
+
+                    if let Some(active_view_id) = handler_response.active_view_id.clone() {
+                        if response.active_view_id.is_none() || project_id == active_project_id {
+                            response.active_view_id = Some(active_view_id);
+                        }
+                    }
+                });
+            }
+
+            if let Err(ix) = this.followers.binary_search(&follower) {
+                this.followers.insert(ix, follower);
+            }
+
+            Ok(response)
+        })
+    }
+
+    async fn handle_unfollow(
+        this: ModelHandle<Self>,
+        envelope: TypedEnvelope<proto::Unfollow>,
+        _: Arc<Client>,
+        mut cx: AsyncAppContext,
+    ) -> Result<()> {
+        this.update(&mut cx, |this, _| {
+            let follower = Follower {
+                project_id: envelope.payload.project_id,
+                peer_id: envelope.original_sender_id()?,
+            };
+            if let Ok(ix) = this.followers.binary_search(&follower) {
+                this.followers.remove(ix);
+            }
+            Ok(())
+        })
+    }
+
+    async fn handle_update_from_leader(
+        this: ModelHandle<Self>,
+        envelope: TypedEnvelope<proto::UpdateFollowers>,
+        _: Arc<Client>,
+        mut cx: AsyncAppContext,
+    ) -> Result<()> {
+        let leader_id = envelope.original_sender_id()?;
+        let update = envelope.payload;
+        this.update(&mut cx, |this, cx| {
+            for workspace in &this.workspaces {
+                let Some(workspace) = workspace.upgrade(cx) else {
+                    continue;
+                };
+                workspace.update(cx.as_mut(), |workspace, cx| {
+                    let project_id = workspace.project.read(cx).remote_id();
+                    if update.project_id != project_id && update.project_id.is_some() {
+                        return;
+                    }
+                    workspace.handle_update_followers(leader_id, update.clone(), cx);
+                });
+            }
+            Ok(())
+        })
+    }
+}
+
+impl Entity for WorkspaceStore {
+    type Event = ();
+}
+
 impl ViewId {
     pub(crate) fn from_proto(message: proto::ViewId) -> Result<Self> {
         Ok(Self {

crates/zed/src/main.rs 🔗

@@ -54,7 +54,7 @@ use welcome::{show_welcome_experience, FIRST_OPEN};
 
 use fs::RealFs;
 use util::{channel::RELEASE_CHANNEL, paths, ResultExt, TryFutureExt};
-use workspace::AppState;
+use workspace::{AppState, WorkspaceStore};
 use zed::{
     assets::Assets,
     build_window_options, handle_keymap_file_changes, initialize_workspace, languages, menus,
@@ -139,6 +139,7 @@ fn main() {
         let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http.clone(), cx));
         let channel_store =
             cx.add_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
+        let workspace_store = cx.add_model(|cx| WorkspaceStore::new(client.clone(), cx));
 
         cx.set_global(client.clone());
 
@@ -187,6 +188,7 @@ fn main() {
             build_window_options,
             initialize_workspace,
             background_actions,
+            workspace_store,
         });
         cx.set_global(Arc::downgrade(&app_state));