Maintain collaborators in `UserStore`

Antonio Scandurra created

Change summary

server/src/rpc.rs   |  77 ++++++++++-----------------
zed/src/lib.rs      |   1 
zed/src/main.rs     |   7 -
zed/src/presence.rs | 130 ----------------------------------------------
zed/src/rpc.rs      |   6 +
zed/src/test.rs     |   1 
zed/src/user.rs     | 100 ++++++++++++++++++++++++++++++++++-
7 files changed, 131 insertions(+), 191 deletions(-)

Detailed changes

server/src/rpc.rs 🔗

@@ -267,13 +267,12 @@ impl Server {
             .await
             .user_id_for_connection(request.sender_id)?;
 
-        let mut collaborator_user_ids = Vec::new();
+        let mut collaborator_user_ids = HashSet::new();
+        collaborator_user_ids.insert(host_user_id);
         for github_login in request.payload.collaborator_logins {
             match self.app_state.db.create_user(&github_login, false).await {
                 Ok(collaborator_user_id) => {
-                    if collaborator_user_id != host_user_id {
-                        collaborator_user_ids.push(collaborator_user_id);
-                    }
+                    collaborator_user_ids.insert(collaborator_user_id);
                 }
                 Err(err) => {
                     let message = err.to_string();
@@ -285,24 +284,19 @@ impl Server {
             }
         }
 
-        let worktree_id;
-        let mut user_ids;
-        {
-            let mut state = self.state.write().await;
-            worktree_id = state.add_worktree(Worktree {
-                host_connection_id: request.sender_id,
-                collaborator_user_ids: collaborator_user_ids.clone(),
-                root_name: request.payload.root_name,
-                share: None,
-            });
-            user_ids = collaborator_user_ids;
-            user_ids.push(host_user_id);
-        }
+        let collaborator_user_ids = collaborator_user_ids.into_iter().collect::<Vec<_>>();
+        let worktree_id = self.state.write().await.add_worktree(Worktree {
+            host_connection_id: request.sender_id,
+            collaborator_user_ids: collaborator_user_ids.clone(),
+            root_name: request.payload.root_name,
+            share: None,
+        });
 
         self.peer
             .respond(receipt, proto::OpenWorktreeResponse { worktree_id })
             .await?;
-        self.update_collaborators_for_users(&user_ids).await?;
+        self.update_collaborators_for_users(&collaborator_user_ids)
+            .await?;
 
         Ok(())
     }
@@ -311,11 +305,6 @@ impl Server {
         self: Arc<Server>,
         mut request: TypedEnvelope<proto::ShareWorktree>,
     ) -> tide::Result<()> {
-        let host_user_id = self
-            .state
-            .read()
-            .await
-            .user_id_for_connection(request.sender_id)?;
         let worktree = request
             .payload
             .worktree
@@ -333,15 +322,14 @@ impl Server {
                 active_replica_ids: Default::default(),
                 entries,
             });
-
-            let mut user_ids = worktree.collaborator_user_ids.clone();
-            user_ids.push(host_user_id);
+            let collaborator_user_ids = worktree.collaborator_user_ids.clone();
 
             drop(state);
             self.peer
                 .respond(request.receipt(), proto::ShareWorktreeResponse {})
                 .await?;
-            self.update_collaborators_for_users(&user_ids).await?;
+            self.update_collaborators_for_users(&collaborator_user_ids)
+                .await?;
         } else {
             self.peer
                 .respond_with_error(
@@ -360,14 +348,9 @@ impl Server {
         request: TypedEnvelope<proto::UnshareWorktree>,
     ) -> tide::Result<()> {
         let worktree_id = request.payload.worktree_id;
-        let host_user_id = self
-            .state
-            .read()
-            .await
-            .user_id_for_connection(request.sender_id)?;
 
         let connection_ids;
-        let mut user_ids;
+        let collaborator_user_ids;
         {
             let mut state = self.state.write().await;
             let worktree = state.write_worktree(worktree_id, request.sender_id)?;
@@ -376,8 +359,8 @@ impl Server {
             }
 
             connection_ids = worktree.connection_ids();
-            user_ids = worktree.collaborator_user_ids.clone();
-            user_ids.push(host_user_id);
+            collaborator_user_ids = worktree.collaborator_user_ids.clone();
+
             worktree.share.take();
             for connection_id in &connection_ids {
                 if let Some(connection) = state.connections.get_mut(connection_id) {
@@ -391,7 +374,8 @@ impl Server {
                 .send(conn_id, proto::UnshareWorktree { worktree_id })
         })
         .await?;
-        self.update_collaborators_for_users(&user_ids).await?;
+        self.update_collaborators_for_users(&collaborator_user_ids)
+            .await?;
 
         Ok(())
     }
@@ -409,7 +393,7 @@ impl Server {
 
         let response;
         let connection_ids;
-        let mut user_ids;
+        let collaborator_user_ids;
         let mut state = self.state.write().await;
         match state.join_worktree(request.sender_id, user_id, worktree_id) {
             Ok((peer_replica_id, worktree)) => {
@@ -437,11 +421,8 @@ impl Server {
                     replica_id: peer_replica_id as u32,
                     peers,
                 };
-
-                let host_connection_id = worktree.host_connection_id;
                 connection_ids = worktree.connection_ids();
-                user_ids = worktree.collaborator_user_ids.clone();
-                user_ids.push(state.user_id_for_connection(host_connection_id)?);
+                collaborator_user_ids = worktree.collaborator_user_ids.clone();
             }
             Err(error) => {
                 self.peer
@@ -471,7 +452,8 @@ impl Server {
         })
         .await?;
         self.peer.respond(request.receipt(), response).await?;
-        self.update_collaborators_for_users(&user_ids).await?;
+        self.update_collaborators_for_users(&collaborator_user_ids)
+            .await?;
 
         Ok(())
     }
@@ -490,16 +472,14 @@ impl Server {
         sender_conn_id: ConnectionId,
     ) -> tide::Result<()> {
         let connection_ids;
-        let mut user_ids;
-
+        let collaborator_user_ids;
         let mut is_host = false;
         let mut is_guest = false;
         {
             let mut state = self.state.write().await;
             let worktree = state.write_worktree(worktree_id, sender_conn_id)?;
-            let host_connection_id = worktree.host_connection_id;
             connection_ids = worktree.connection_ids();
-            user_ids = worktree.collaborator_user_ids.clone();
+            collaborator_user_ids = worktree.collaborator_user_ids.clone();
 
             if worktree.host_connection_id == sender_conn_id {
                 is_host = true;
@@ -511,8 +491,6 @@ impl Server {
                     share.active_replica_ids.remove(&replica_id);
                 }
             }
-
-            user_ids.push(state.user_id_for_connection(host_connection_id)?);
         }
 
         if is_host {
@@ -533,7 +511,8 @@ impl Server {
             })
             .await?
         }
-        self.update_collaborators_for_users(&user_ids).await?;
+        self.update_collaborators_for_users(&collaborator_user_ids)
+            .await?;
         Ok(())
     }
 

zed/src/lib.rs 🔗

@@ -9,7 +9,6 @@ pub mod http;
 pub mod language;
 pub mod menus;
 pub mod people_panel;
-pub mod presence;
 pub mod project_browser;
 pub mod rpc;
 pub mod settings;

zed/src/main.rs 🔗

@@ -13,9 +13,7 @@ use zed::{
     channel::ChannelList,
     chat_panel, editor, file_finder,
     fs::RealFs,
-    http, language, menus,
-    presence::Presence,
-    rpc, settings, theme_selector,
+    http, language, menus, rpc, settings, theme_selector,
     user::UserStore,
     workspace::{self, OpenNew, OpenParams, OpenPaths},
     AppState,
@@ -40,14 +38,13 @@ fn main() {
     app.run(move |cx| {
         let rpc = rpc::Client::new();
         let http = http::client();
-        let user_store = UserStore::new(rpc.clone(), http.clone(), cx.background());
+        let user_store = cx.add_model(|cx| UserStore::new(rpc.clone(), http.clone(), cx));
         let app_state = Arc::new(AppState {
             languages: languages.clone(),
             settings_tx: Arc::new(Mutex::new(settings_tx)),
             settings,
             themes,
             channel_list: cx.add_model(|cx| ChannelList::new(user_store.clone(), rpc.clone(), cx)),
-            presence: cx.add_model(|cx| Presence::new(user_store.clone(), rpc.clone(), cx)),
             rpc,
             user_store,
             fs: Arc::new(RealFs),

zed/src/presence.rs 🔗

@@ -1,130 +0,0 @@
-use crate::{
-    rpc::Client,
-    user::{User, UserStore},
-    util::TryFutureExt,
-};
-use anyhow::Result;
-use gpui::{Entity, ModelContext, Task};
-use postage::prelude::Stream;
-use smol::future::FutureExt;
-use std::{collections::HashSet, sync::Arc, time::Duration};
-use zrpc::proto;
-
-pub struct Presence {
-    collaborators: Vec<Collaborator>,
-    user_store: Arc<UserStore>,
-    rpc: Arc<Client>,
-    _maintain_people: Task<()>,
-}
-
-#[derive(Debug)]
-struct Collaborator {
-    user: Arc<User>,
-    worktrees: Vec<WorktreeMetadata>,
-}
-
-#[derive(Debug)]
-struct WorktreeMetadata {
-    root_name: String,
-    is_shared: bool,
-    participants: Vec<Arc<User>>,
-}
-
-impl Presence {
-    pub fn new(user_store: Arc<UserStore>, rpc: Arc<Client>, cx: &mut ModelContext<Self>) -> Self {
-        let _maintain_collaborators = cx.spawn_weak(|this, mut cx| {
-            let user_store = user_store.clone();
-            let foreground = cx.foreground();
-            async move {
-                let mut current_user = user_store.watch_current_user();
-                loop {
-                    let timer = foreground.timer(Duration::from_secs(2));
-                    let next_current_user = async {
-                        current_user.recv().await;
-                    };
-
-                    next_current_user.race(timer).await;
-                    if current_user.borrow().is_some() {
-                        if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
-                            this.update(&mut cx, |this, cx| this.refresh(cx))
-                                .log_err()
-                                .await;
-                        }
-                    }
-                }
-            }
-        });
-
-        Self {
-            collaborators: Vec::new(),
-            user_store,
-            rpc,
-            _maintain_people: _maintain_collaborators,
-        }
-    }
-
-    fn refresh(&self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
-        cx.spawn(|this, mut cx| {
-            let rpc = self.rpc.clone();
-            let user_store = self.user_store.clone();
-            async move {
-                //     let response = rpc.request(proto::GetCollaborators {}).await?;
-                //     let mut user_ids = HashSet::new();
-                //     for collaborator in &response.collaborators {
-                //         user_ids.insert(collaborator.user_id);
-                //         user_ids.extend(
-                //             collaborator
-                //                 .worktrees
-                //                 .iter()
-                //                 .flat_map(|w| &w.participants)
-                //                 .copied(),
-                //         );
-                //     }
-                //     user_store
-                //         .load_users(user_ids.into_iter().collect())
-                //         .await?;
-
-                //     let mut collaborators = Vec::new();
-                //     for collaborator in response.collaborators {
-                //         collaborators.push(Collaborator::from_proto(collaborator, &user_store).await?);
-                //     }
-
-                //     this.update(&mut cx, |this, cx| {
-                //         this.collaborators = collaborators;
-                //         cx.notify();
-                //     });
-
-                Ok(())
-            }
-        })
-    }
-}
-
-pub enum Event {}
-
-impl Entity for Presence {
-    type Event = Event;
-}
-
-// impl Collaborator {
-//     async fn from_proto(
-//         collaborator: proto::Collaborator,
-//         user_store: &Arc<UserStore>,
-//         cx: &mut AsyncAppContext,
-//     ) -> Result<Self> {
-//         let user = user_store.fetch_user(collaborator.user_id).await?;
-//         let mut worktrees = Vec::new();
-//         for worktree in collaborator.worktrees {
-//             let mut participants = Vec::new();
-//             for participant_id in worktree.participants {
-//                 participants.push(user_store.fetch_user(participant_id).await?);
-//             }
-//             worktrees.push(WorktreeMetadata {
-//                 root_name: worktree.root_name,
-//                 is_shared: worktree.is_shared,
-//                 participants,
-//             });
-//         }
-//         Ok(Self { user, worktrees })
-//     }
-// }

zed/src/rpc.rs 🔗

@@ -230,7 +230,11 @@ impl Client {
         }
     }
 
-    pub fn subscribe<T, M, F>(self: &Arc<Self>, cx: ModelContext<M>, mut handler: F) -> Subscription
+    pub fn subscribe<T, M, F>(
+        self: &Arc<Self>,
+        cx: &mut ModelContext<M>,
+        mut handler: F,
+    ) -> Subscription
     where
         T: EnvelopedMessage,
         M: Entity,

zed/src/test.rs 🔗

@@ -4,7 +4,6 @@ use crate::{
     fs::RealFs,
     http::{HttpClient, Request, Response, ServerResponse},
     language::LanguageRegistry,
-    presence::Presence,
     rpc::{self, Client, Credentials, EstablishConnectionError},
     settings::{self, ThemeRegistry},
     time::ReplicaId,

zed/src/user.rs 🔗

@@ -1,14 +1,17 @@
 use crate::{
     http::{HttpClient, Method, Request, Url},
-    rpc::{Client, Status},
+    rpc::{self, Client, Status},
     util::TryFutureExt,
 };
 use anyhow::{anyhow, Context, Result};
 use futures::future;
-use gpui::{Entity, ImageData, ModelContext, Task};
+use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
 use postage::{prelude::Stream, sink::Sink, watch};
-use std::{collections::HashMap, sync::Arc};
-use zrpc::proto;
+use std::{
+    collections::{HashMap, HashSet},
+    sync::Arc,
+};
+use zrpc::{proto, TypedEnvelope};
 
 #[derive(Debug)]
 pub struct User {
@@ -17,11 +20,26 @@ pub struct User {
     pub avatar: Option<Arc<ImageData>>,
 }
 
+#[derive(Debug)]
+struct Collaborator {
+    pub user: Arc<User>,
+    pub worktrees: Vec<WorktreeMetadata>,
+}
+
+#[derive(Debug)]
+struct WorktreeMetadata {
+    pub root_name: String,
+    pub is_shared: bool,
+    pub participants: Vec<Arc<User>>,
+}
+
 pub struct UserStore {
     users: HashMap<u64, Arc<User>>,
     current_user: watch::Receiver<Option<Arc<User>>>,
+    collaborators: Vec<Collaborator>,
     rpc: Arc<Client>,
     http: Arc<dyn HttpClient>,
+    _maintain_collaborators: rpc::Subscription,
     _maintain_current_user: Task<()>,
 }
 
@@ -37,8 +55,10 @@ impl UserStore {
         Self {
             users: Default::default(),
             current_user: current_user_rx,
+            collaborators: Default::default(),
             rpc: rpc.clone(),
             http,
+            _maintain_collaborators: rpc.subscribe(cx, Self::update_collaborators),
             _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
                 let mut status = rpc.status();
                 while let Some(status) = status.recv().await {
@@ -62,6 +82,45 @@ impl UserStore {
         }
     }
 
+    fn update_collaborators(
+        &mut self,
+        message: TypedEnvelope<proto::UpdateCollaborators>,
+        _: Arc<Client>,
+        cx: &mut ModelContext<Self>,
+    ) -> Result<()> {
+        let mut user_ids = HashSet::new();
+        for collaborator in &message.payload.collaborators {
+            user_ids.insert(collaborator.user_id);
+            user_ids.extend(
+                collaborator
+                    .worktrees
+                    .iter()
+                    .flat_map(|w| &w.participants)
+                    .copied(),
+            );
+        }
+
+        let load_users = self.load_users(user_ids.into_iter().collect(), cx);
+        cx.spawn(|this, mut cx| async move {
+            load_users.await?;
+
+            let mut collaborators = Vec::new();
+            for collaborator in message.payload.collaborators {
+                collaborators.push(Collaborator::from_proto(collaborator, &this, &mut cx).await?);
+            }
+
+            this.update(&mut cx, |this, cx| {
+                this.collaborators = collaborators;
+                cx.notify();
+            });
+
+            Result::<_, anyhow::Error>::Ok(())
+        })
+        .detach();
+
+        Ok(())
+    }
+
     pub fn load_users(
         &mut self,
         mut user_ids: Vec<u64>,
@@ -134,6 +193,39 @@ impl User {
     }
 }
 
+impl Collaborator {
+    async fn from_proto(
+        collaborator: proto::Collaborator,
+        user_store: &ModelHandle<UserStore>,
+        cx: &mut AsyncAppContext,
+    ) -> Result<Self> {
+        let user = user_store
+            .update(cx, |user_store, cx| {
+                user_store.fetch_user(collaborator.user_id, cx)
+            })
+            .await?;
+        let mut worktrees = Vec::new();
+        for worktree in collaborator.worktrees {
+            let mut participants = Vec::new();
+            for participant_id in worktree.participants {
+                participants.push(
+                    user_store
+                        .update(cx, |user_store, cx| {
+                            user_store.fetch_user(participant_id, cx)
+                        })
+                        .await?,
+                );
+            }
+            worktrees.push(WorktreeMetadata {
+                root_name: worktree.root_name,
+                is_shared: worktree.is_shared,
+                participants,
+            });
+        }
+        Ok(Self { user, worktrees })
+    }
+}
+
 async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
     let url = Url::parse(url).with_context(|| format!("failed to parse avatar url {:?}", url))?;
     let mut request = Request::new(Method::Get, url);