Start on subscribing to messages in channel entity instances

Nathan Sobo , Max Brunsfeld , and Antonio Scandurra created

Co-Authored-By: Max Brunsfeld <maxbrunsfeld@gmail.com>
Co-Authored-By: Antonio Scandurra <me@as-cii.com>

Change summary

gpui/src/app.rs    | 66 ++++++++++++++++++++++++++++++++++++++++++-----
zed/src/channel.rs | 63 +++++++++++++++++++++++++++++++++++----------
zed/src/rpc.rs     |  6 +++
zrpc/src/peer.rs   | 18 +++++++++---
zrpc/src/proto.rs  |  2 
5 files changed, 126 insertions(+), 29 deletions(-)

Detailed changes

gpui/src/app.rs 🔗

@@ -70,6 +70,11 @@ pub trait UpdateModel {
         F: FnOnce(&mut T, &mut ModelContext<T>) -> S;
 }
 
+pub trait UpgradeModelHandle {
+    fn upgrade_model_handle<T: Entity>(&self, handle: WeakModelHandle<T>)
+        -> Option<ModelHandle<T>>;
+}
+
 pub trait ReadView {
     fn read_view<T: View>(&self, handle: &ViewHandle<T>) -> &T;
 }
@@ -457,6 +462,15 @@ impl UpdateModel for AsyncAppContext {
     }
 }
 
+impl UpgradeModelHandle for AsyncAppContext {
+    fn upgrade_model_handle<T: Entity>(
+        &self,
+        handle: WeakModelHandle<T>,
+    ) -> Option<ModelHandle<T>> {
+        self.0.borrow_mut().upgrade_model_handle(handle)
+    }
+}
+
 impl ReadModelWith for AsyncAppContext {
     fn read_model_with<E: Entity, F: FnOnce(&E, &AppContext) -> T, T>(
         &self,
@@ -1434,6 +1448,15 @@ impl UpdateModel for MutableAppContext {
     }
 }
 
+impl UpgradeModelHandle for MutableAppContext {
+    fn upgrade_model_handle<T: Entity>(
+        &self,
+        handle: WeakModelHandle<T>,
+    ) -> Option<ModelHandle<T>> {
+        self.cx.upgrade_model_handle(handle)
+    }
+}
+
 impl ReadView for MutableAppContext {
     fn read_view<T: View>(&self, handle: &ViewHandle<T>) -> &T {
         if let Some(view) = self.cx.views.get(&(handle.window_id, handle.view_id)) {
@@ -1565,6 +1588,19 @@ impl ReadModel for AppContext {
     }
 }
 
+impl UpgradeModelHandle for AppContext {
+    fn upgrade_model_handle<T: Entity>(
+        &self,
+        handle: WeakModelHandle<T>,
+    ) -> Option<ModelHandle<T>> {
+        if self.models.contains_key(&handle.model_id) {
+            Some(ModelHandle::new(handle.model_id, &self.ref_counts))
+        } else {
+            None
+        }
+    }
+}
+
 impl ReadView for AppContext {
     fn read_view<T: View>(&self, handle: &ViewHandle<T>) -> &T {
         if let Some(view) = self.views.get(&(handle.window_id, handle.view_id)) {
@@ -1858,6 +1894,15 @@ impl<M> UpdateModel for ModelContext<'_, M> {
     }
 }
 
+impl<M> UpgradeModelHandle for ModelContext<'_, M> {
+    fn upgrade_model_handle<T: Entity>(
+        &self,
+        handle: WeakModelHandle<T>,
+    ) -> Option<ModelHandle<T>> {
+        self.cx.upgrade_model_handle(handle)
+    }
+}
+
 impl<M> Deref for ModelContext<'_, M> {
     type Target = MutableAppContext;
 
@@ -2175,6 +2220,15 @@ impl<V> ReadModel for ViewContext<'_, V> {
     }
 }
 
+impl<V> UpgradeModelHandle for ViewContext<'_, V> {
+    fn upgrade_model_handle<T: Entity>(
+        &self,
+        handle: WeakModelHandle<T>,
+    ) -> Option<ModelHandle<T>> {
+        self.cx.upgrade_model_handle(handle)
+    }
+}
+
 impl<V: View> UpdateModel for ViewContext<'_, V> {
     fn update_model<T, F, S>(&mut self, handle: &ModelHandle<T>, update: F) -> S
     where
@@ -2372,7 +2426,6 @@ impl<T> Handle<T> for ModelHandle<T> {
         EntityLocation::Model(self.model_id)
     }
 }
-
 pub struct WeakModelHandle<T> {
     model_id: usize,
     model_type: PhantomData<T>,
@@ -2386,13 +2439,8 @@ impl<T: Entity> WeakModelHandle<T> {
         }
     }
 
-    pub fn upgrade(&self, cx: impl AsRef<AppContext>) -> Option<ModelHandle<T>> {
-        let cx = cx.as_ref();
-        if cx.models.contains_key(&self.model_id) {
-            Some(ModelHandle::new(self.model_id, &cx.ref_counts))
-        } else {
-            None
-        }
+    pub fn upgrade(self, cx: &impl UpgradeModelHandle) -> Option<ModelHandle<T>> {
+        cx.upgrade_model_handle(self)
     }
 }
 
@@ -2419,6 +2467,8 @@ impl<T> Clone for WeakModelHandle<T> {
     }
 }
 
+impl<T> Copy for WeakModelHandle<T> {}
+
 pub struct ViewHandle<T> {
     window_id: usize,
     view_id: usize,

zed/src/channel.rs 🔗

@@ -1,7 +1,8 @@
 use crate::rpc::{self, Client};
 use anyhow::{anyhow, Result};
+use futures::StreamExt;
 use gpui::{
-    AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, WeakModelHandle,
+    AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
 };
 use std::{
     collections::{HashMap, VecDeque},
@@ -24,6 +25,7 @@ pub struct Channel {
     first_message_id: Option<u64>,
     messages: Option<VecDeque<ChannelMessage>>,
     rpc: Arc<Client>,
+    _receive_messages: Task<()>,
 }
 
 pub struct ChannelMessage {
@@ -43,19 +45,20 @@ impl ChannelList {
     ) -> Self {
         // Subscribe to messages.
         let this = cx.handle().downgrade();
-        rpc.on_message(
-            router,
-            |envelope, rpc, cx: &mut AsyncAppContext| async move {
-                cx.update(|cx| {
-                    if let Some(this) = this.upgrade(cx) {
-                        this.update(cx, |this, cx| this.receive_message(envelope, cx))
-                    } else {
-                        Err(anyhow!("can't upgrade ChannelList handle"))
-                    }
-                })
-            },
-            cx,
-        );
+
+        // rpc.on_message(
+        //     router,
+        //     |envelope, rpc, cx: &mut AsyncAppContext| async move {
+        //         cx.update(|cx| {
+        //             if let Some(this) = this.upgrade(cx) {
+        //                 this.update(cx, |this, cx| this.receive_message(envelope, cx))
+        //             } else {
+        //                 Err(anyhow!("can't upgrade ChannelList handle"))
+        //             }
+        //         })
+        //     },
+        //     cx,
+        // );
 
         Self {
             available_channels: Default::default(),
@@ -72,3 +75,35 @@ impl ChannelList {
         Ok(())
     }
 }
+
+impl Entity for Channel {
+    type Event = ();
+}
+
+impl Channel {
+    pub fn new(details: ChannelDetails, rpc: Arc<Client>, cx: &mut ModelContext<Self>) -> Self {
+        let messages = rpc.subscribe();
+        let receive_messages = cx.spawn_weak(|this, cx| async move {
+            while let Some(message) = messages.next().await {
+                if let Some(this) = this.upgrade(&cx) {
+                    this.update(&mut cx, |this, cx| this.message_received(&message, cx));
+                }
+            }
+        });
+
+        Self {
+            details,
+            rpc,
+            first_message_id: None,
+            messages: None,
+            _receive_messages: receive_messages,
+        }
+    }
+
+    fn message_received(
+        &mut self,
+        message: &TypedEnvelope<ChannelMessageSent>,
+        cx: &mut ModelContext<Self>,
+    ) {
+    }
+}

zed/src/rpc.rs 🔗

@@ -2,6 +2,7 @@ use crate::{language::LanguageRegistry, worktree::Worktree};
 use anyhow::{anyhow, Context, Result};
 use async_tungstenite::tungstenite::http::Request;
 use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage};
+use futures::Stream;
 use gpui::{AsyncAppContext, ModelHandle, Task, WeakModelHandle};
 use lazy_static::lazy_static;
 use smol::lock::RwLock;
@@ -29,7 +30,6 @@ pub struct Client {
 pub struct ClientState {
     connection_id: Option<ConnectionId>,
     pub shared_worktrees: HashMap<u64, WeakModelHandle<Worktree>>,
-    pub channel_list: Option<WeakModelHandle<ChannelList>>,
     pub languages: Arc<LanguageRegistry>,
 }
 
@@ -82,6 +82,10 @@ impl Client {
         });
     }
 
+    pub fn subscribe<T: EnvelopedMessage>(&self) -> impl Stream<Item = Arc<TypedEnvelope<T>>> {
+        self.peer.subscribe()
+    }
+
     pub async fn log_in_and_connect(
         &self,
         router: Arc<ForegroundRouter>,

zrpc/src/peer.rs 🔗

@@ -3,15 +3,15 @@ use anyhow::{anyhow, Context, Result};
 use async_lock::{Mutex, RwLock};
 use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage};
 use futures::{
-    future::{BoxFuture, LocalBoxFuture},
-    FutureExt, StreamExt,
+    future::{self, BoxFuture, LocalBoxFuture},
+    FutureExt, Stream, StreamExt,
 };
 use postage::{
-    mpsc,
-    prelude::{Sink, Stream},
+    broadcast, mpsc,
+    prelude::{Sink as _, Stream as _},
 };
 use std::{
-    any::TypeId,
+    any::{Any, TypeId},
     collections::{HashMap, HashSet},
     fmt,
     future::Future,
@@ -77,6 +77,7 @@ pub struct RouterInternal<H> {
 pub struct Peer {
     connections: RwLock<HashMap<ConnectionId, Connection>>,
     next_connection_id: AtomicU32,
+    incoming_messages: broadcast::Sender<Arc<dyn Any + Send + Sync>>,
 }
 
 #[derive(Clone)]
@@ -91,6 +92,7 @@ impl Peer {
         Arc::new(Self {
             connections: Default::default(),
             next_connection_id: Default::default(),
+            incoming_messages: broadcast::channel(256).0,
         })
     }
 
@@ -189,6 +191,12 @@ impl Peer {
         self.connections.write().await.clear();
     }
 
+    pub fn subscribe<T: EnvelopedMessage>(&self) -> impl Stream<Item = Arc<TypedEnvelope<T>>> {
+        self.incoming_messages
+            .subscribe()
+            .filter_map(|envelope| future::ready(Arc::downcast(envelope).ok()))
+    }
+
     pub fn request<T: RequestMessage>(
         self: &Arc<Self>,
         receiver_id: ConnectionId,

zrpc/src/proto.rs 🔗

@@ -8,7 +8,7 @@ use std::{
 
 include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 
-pub trait EnvelopedMessage: Clone + Sized + Send + 'static {
+pub trait EnvelopedMessage: Clone + Sized + Send + Sync + 'static {
     const NAME: &'static str;
     fn into_envelope(
         self,