WIP

Antonio Scandurra created

Change summary

crates/client2/src/client2.rs |  8 +++---
crates/client2/src/user.rs    | 48 +++++++++++++++++++++---------------
2 files changed, 32 insertions(+), 24 deletions(-)

Detailed changes

crates/client2/src/client2.rs 🔗

@@ -15,7 +15,7 @@ use futures::{
 };
 use gpui2::{
     serde_json, AnyHandle, AnyWeakHandle, AnyWindowHandle, AppContext, AsyncAppContext, Handle,
-    SemanticVersion, Task, ViewContext,
+    SemanticVersion, Task, ViewContext, WeakHandle,
 };
 use lazy_static::lazy_static;
 use parking_lot::RwLock;
@@ -575,7 +575,7 @@ impl Client {
     #[track_caller]
     pub fn add_message_handler<M, E, H, F>(
         self: &Arc<Self>,
-        entity: Handle<E>,
+        entity: WeakHandle<E>,
         handler: H,
     ) -> Subscription
     where
@@ -589,7 +589,7 @@ impl Client {
         let mut state = self.state.write();
         state
             .models_by_message_type
-            .insert(message_type_id, entity.downgrade().into());
+            .insert(message_type_id, entity.into());
 
         let prev_handler = state.message_handlers.insert(
             message_type_id,
@@ -617,7 +617,7 @@ impl Client {
 
     pub fn add_request_handler<M, E, H, F>(
         self: &Arc<Self>,
-        model: Handle<E>,
+        model: WeakHandle<E>,
         handler: H,
     ) -> Subscription
     where

crates/client2/src/user.rs 🔗

@@ -137,25 +137,32 @@ impl UserStore {
             client: Arc::downgrade(&client),
             update_contacts_tx,
             http,
-            _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
+            _maintain_contacts: cx.spawn(|this, mut cx| async move {
                 let _subscriptions = rpc_subscriptions;
                 while let Some(message) = update_contacts_rx.next().await {
-                    if let Some(this) = this.upgrade(&cx) {
+                    if let Ok(task) =
                         this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
-                            .log_err()
-                            .await;
+                    {
+                        task.log_err().await;
+                    } else {
+                        break;
                     }
                 }
             }),
-            _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
+            _maintain_current_user: cx.spawn(|this, mut cx| async move {
                 let mut status = client.status();
                 while let Some(status) = status.next().await {
                     match status {
                         Status::Connected { .. } => {
-                            if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
-                                let fetch_user = this
-                                    .update(&mut cx, |this, cx| this.get_user(user_id, cx))
-                                    .log_err();
+                            if let Some(user_id) = client.user_id() {
+                                let fetch_user = if let Ok(fetch_user) = this
+                                    .update(&mut cx, |this, cx| {
+                                        this.get_user(user_id, cx).log_err()
+                                    }) {
+                                    fetch_user
+                                } else {
+                                    break;
+                                };
                                 let fetch_metrics_id =
                                     client.request(proto::GetPrivateUserInfo {}).log_err();
                                 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
@@ -605,7 +612,7 @@ impl UserStore {
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<Arc<User>>> {
         if let Some(user) = self.users.get(&user_id).cloned() {
-            return cx.foreground().spawn(async move { Ok(user) });
+            return Task::ready(Ok(user));
         }
 
         let load_users = self.get_users(vec![user_id], cx);
@@ -616,7 +623,7 @@ impl UserStore {
                     .get(&user_id)
                     .cloned()
                     .ok_or_else(|| anyhow!("server responded with no users"))
-            })
+            })?
         })
     }
 
@@ -635,7 +642,7 @@ impl UserStore {
     ) -> Task<Result<Vec<Arc<User>>>> {
         let client = self.client.clone();
         let http = self.http.clone();
-        cx.spawn_weak(|this, mut cx| async move {
+        cx.spawn(|this, mut cx| async move {
             if let Some(rpc) = client.upgrade() {
                 let response = rpc.request(request).await.context("error loading users")?;
                 let users = future::join_all(
@@ -646,13 +653,13 @@ impl UserStore {
                 )
                 .await;
 
-                if let Some(this) = this.upgrade(&cx) {
-                    this.update(&mut cx, |this, _| {
-                        for user in &users {
-                            this.users.insert(user.id, user.clone());
-                        }
-                    });
-                }
+                this.update(&mut cx, |this, _| {
+                    for user in &users {
+                        this.users.insert(user.id, user.clone());
+                    }
+                })
+                .ok();
+
                 Ok(users)
             } else {
                 Ok(Vec::new())
@@ -715,6 +722,7 @@ impl Collaborator {
     }
 }
 
+// todo!("we probably don't need this now that we fetch")
 async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
     let mut response = http
         .get(url, Default::default(), true)
@@ -733,5 +741,5 @@ async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>
         .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
     let format = image::guess_format(&body)?;
     let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
-    Ok(ImageData::new(image))
+    Ok(Arc::new(ImageData::new(image)))
 }