client: Re-fetch the authenticated user when receiving a `UserUpdated` message from Cloud (#35807)

Marshall Bowers and Richard created

This PR wires up handling for the new `UserUpdated` message coming from
Cloud over the WebSocket connection.

When we receive this message we will refresh the authenticated user.

Release Notes:

- N/A

Co-authored-by: Richard <richard@zed.dev>

Change summary

crates/client/src/client.rs | 24 ++++++++++++++++++++----
crates/client/src/user.rs   | 33 +++++++++++++++++++++++++++++++++
2 files changed, 53 insertions(+), 4 deletions(-)

Detailed changes

crates/client/src/client.rs 🔗

@@ -192,6 +192,8 @@ pub fn init(client: &Arc<Client>, cx: &mut App) {
     });
 }
 
+pub type MessageToClientHandler = Box<dyn Fn(&MessageToClient, &App) + Send + Sync + 'static>;
+
 struct GlobalClient(Arc<Client>);
 
 impl Global for GlobalClient {}
@@ -205,6 +207,7 @@ pub struct Client {
     credentials_provider: ClientCredentialsProvider,
     state: RwLock<ClientState>,
     handler_set: parking_lot::Mutex<ProtoMessageHandlerSet>,
+    message_to_client_handlers: parking_lot::Mutex<Vec<MessageToClientHandler>>,
 
     #[allow(clippy::type_complexity)]
     #[cfg(any(test, feature = "test-support"))]
@@ -554,6 +557,7 @@ impl Client {
             credentials_provider: ClientCredentialsProvider::new(cx),
             state: Default::default(),
             handler_set: Default::default(),
+            message_to_client_handlers: parking_lot::Mutex::new(Vec::new()),
 
             #[cfg(any(test, feature = "test-support"))]
             authenticate: Default::default(),
@@ -1651,10 +1655,22 @@ impl Client {
         }
     }
 
-    fn handle_message_to_client(self: &Arc<Client>, message: MessageToClient, _cx: &AsyncApp) {
-        match message {
-            MessageToClient::UserUpdated => {}
-        }
+    pub fn add_message_to_client_handler(
+        self: &Arc<Client>,
+        handler: impl Fn(&MessageToClient, &App) + Send + Sync + 'static,
+    ) {
+        self.message_to_client_handlers
+            .lock()
+            .push(Box::new(handler));
+    }
+
+    fn handle_message_to_client(self: &Arc<Client>, message: MessageToClient, cx: &AsyncApp) {
+        cx.update(|cx| {
+            for handler in self.message_to_client_handlers.lock().iter() {
+                handler(&message, cx);
+            }
+        })
+        .ok();
     }
 
     pub fn telemetry(&self) -> &Arc<Telemetry> {

crates/client/src/user.rs 🔗

@@ -1,6 +1,7 @@
 use super::{Client, Status, TypedEnvelope, proto};
 use anyhow::{Context as _, Result, anyhow};
 use chrono::{DateTime, Utc};
+use cloud_api_client::websocket_protocol::MessageToClient;
 use cloud_api_client::{GetAuthenticatedUserResponse, PlanInfo};
 use cloud_llm_client::{
     EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
@@ -181,6 +182,12 @@ impl UserStore {
             client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
             client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
         ];
+
+        client.add_message_to_client_handler({
+            let this = cx.weak_entity();
+            move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
+        });
+
         Self {
             users: Default::default(),
             by_github_login: Default::default(),
@@ -813,6 +820,32 @@ impl UserStore {
         cx.emit(Event::PrivateUserInfoUpdated);
     }
 
+    fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
+        cx.spawn(async move |cx| {
+            match message {
+                MessageToClient::UserUpdated => {
+                    let cloud_client = cx
+                        .update(|cx| {
+                            this.read_with(cx, |this, _cx| {
+                                this.client.upgrade().map(|client| client.cloud_client())
+                            })
+                        })??
+                        .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
+
+                    let response = cloud_client.get_authenticated_user().await?;
+                    cx.update(|cx| {
+                        this.update(cx, |this, cx| {
+                            this.update_authenticated_user(response, cx);
+                        })
+                    })??;
+                }
+            }
+
+            anyhow::Ok(())
+        })
+        .detach_and_log_err(cx);
+    }
+
     pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
         self.current_user.clone()
     }