From 541f58e12ce5e0d360d666c7366e95f406afb06b Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Wed, 18 Aug 2021 11:45:29 -0600 Subject: [PATCH] Start on subscribing to messages in channel entity instances Co-Authored-By: Max Brunsfeld Co-Authored-By: Antonio Scandurra --- gpui/src/app.rs | 66 ++++++++++++++++++++++++++++++++++++++++------ zed/src/channel.rs | 63 +++++++++++++++++++++++++++++++++---------- zed/src/rpc.rs | 6 ++++- zrpc/src/peer.rs | 18 +++++++++---- zrpc/src/proto.rs | 2 +- 5 files changed, 126 insertions(+), 29 deletions(-) diff --git a/gpui/src/app.rs b/gpui/src/app.rs index 579bf5c4682dba2fcfbe15d48166396f3a9ea81d..a2fa8188246bed0c3b2dab51a266b48926502d2c 100644 --- a/gpui/src/app.rs +++ b/gpui/src/app.rs @@ -70,6 +70,11 @@ pub trait UpdateModel { F: FnOnce(&mut T, &mut ModelContext) -> S; } +pub trait UpgradeModelHandle { + fn upgrade_model_handle(&self, handle: WeakModelHandle) + -> Option>; +} + pub trait ReadView { fn read_view(&self, handle: &ViewHandle) -> &T; } @@ -457,6 +462,15 @@ impl UpdateModel for AsyncAppContext { } } +impl UpgradeModelHandle for AsyncAppContext { + fn upgrade_model_handle( + &self, + handle: WeakModelHandle, + ) -> Option> { + self.0.borrow_mut().upgrade_model_handle(handle) + } +} + impl ReadModelWith for AsyncAppContext { fn read_model_with T, T>( &self, @@ -1434,6 +1448,15 @@ impl UpdateModel for MutableAppContext { } } +impl UpgradeModelHandle for MutableAppContext { + fn upgrade_model_handle( + &self, + handle: WeakModelHandle, + ) -> Option> { + self.cx.upgrade_model_handle(handle) + } +} + impl ReadView for MutableAppContext { fn read_view(&self, handle: &ViewHandle) -> &T { if let Some(view) = self.cx.views.get(&(handle.window_id, handle.view_id)) { @@ -1565,6 +1588,19 @@ impl ReadModel for AppContext { } } +impl UpgradeModelHandle for AppContext { + fn upgrade_model_handle( + &self, + handle: WeakModelHandle, + ) -> Option> { + if self.models.contains_key(&handle.model_id) { + Some(ModelHandle::new(handle.model_id, &self.ref_counts)) + } else { + None + } + } +} + impl ReadView for AppContext { fn read_view(&self, handle: &ViewHandle) -> &T { if let Some(view) = self.views.get(&(handle.window_id, handle.view_id)) { @@ -1858,6 +1894,15 @@ impl UpdateModel for ModelContext<'_, M> { } } +impl UpgradeModelHandle for ModelContext<'_, M> { + fn upgrade_model_handle( + &self, + handle: WeakModelHandle, + ) -> Option> { + self.cx.upgrade_model_handle(handle) + } +} + impl Deref for ModelContext<'_, M> { type Target = MutableAppContext; @@ -2175,6 +2220,15 @@ impl ReadModel for ViewContext<'_, V> { } } +impl UpgradeModelHandle for ViewContext<'_, V> { + fn upgrade_model_handle( + &self, + handle: WeakModelHandle, + ) -> Option> { + self.cx.upgrade_model_handle(handle) + } +} + impl UpdateModel for ViewContext<'_, V> { fn update_model(&mut self, handle: &ModelHandle, update: F) -> S where @@ -2372,7 +2426,6 @@ impl Handle for ModelHandle { EntityLocation::Model(self.model_id) } } - pub struct WeakModelHandle { model_id: usize, model_type: PhantomData, @@ -2386,13 +2439,8 @@ impl WeakModelHandle { } } - pub fn upgrade(&self, cx: impl AsRef) -> Option> { - let cx = cx.as_ref(); - if cx.models.contains_key(&self.model_id) { - Some(ModelHandle::new(self.model_id, &cx.ref_counts)) - } else { - None - } + pub fn upgrade(self, cx: &impl UpgradeModelHandle) -> Option> { + cx.upgrade_model_handle(self) } } @@ -2419,6 +2467,8 @@ impl Clone for WeakModelHandle { } } +impl Copy for WeakModelHandle {} + pub struct ViewHandle { window_id: usize, view_id: usize, diff --git a/zed/src/channel.rs b/zed/src/channel.rs index 2ec8b8c2acfab903137693aae8a04b2c0ff7bfd4..0554f2b1838e11dca3f9f5185c699360059f499d 100644 --- a/zed/src/channel.rs +++ b/zed/src/channel.rs @@ -1,7 +1,8 @@ use crate::rpc::{self, Client}; use anyhow::{anyhow, Result}; +use futures::StreamExt; use gpui::{ - AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, WeakModelHandle, + AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle, }; use std::{ collections::{HashMap, VecDeque}, @@ -24,6 +25,7 @@ pub struct Channel { first_message_id: Option, messages: Option>, rpc: Arc, + _receive_messages: Task<()>, } pub struct ChannelMessage { @@ -43,19 +45,20 @@ impl ChannelList { ) -> Self { // Subscribe to messages. let this = cx.handle().downgrade(); - rpc.on_message( - router, - |envelope, rpc, cx: &mut AsyncAppContext| async move { - cx.update(|cx| { - if let Some(this) = this.upgrade(cx) { - this.update(cx, |this, cx| this.receive_message(envelope, cx)) - } else { - Err(anyhow!("can't upgrade ChannelList handle")) - } - }) - }, - cx, - ); + + // rpc.on_message( + // router, + // |envelope, rpc, cx: &mut AsyncAppContext| async move { + // cx.update(|cx| { + // if let Some(this) = this.upgrade(cx) { + // this.update(cx, |this, cx| this.receive_message(envelope, cx)) + // } else { + // Err(anyhow!("can't upgrade ChannelList handle")) + // } + // }) + // }, + // cx, + // ); Self { available_channels: Default::default(), @@ -72,3 +75,35 @@ impl ChannelList { Ok(()) } } + +impl Entity for Channel { + type Event = (); +} + +impl Channel { + pub fn new(details: ChannelDetails, rpc: Arc, cx: &mut ModelContext) -> Self { + let messages = rpc.subscribe(); + let receive_messages = cx.spawn_weak(|this, cx| async move { + while let Some(message) = messages.next().await { + if let Some(this) = this.upgrade(&cx) { + this.update(&mut cx, |this, cx| this.message_received(&message, cx)); + } + } + }); + + Self { + details, + rpc, + first_message_id: None, + messages: None, + _receive_messages: receive_messages, + } + } + + fn message_received( + &mut self, + message: &TypedEnvelope, + cx: &mut ModelContext, + ) { + } +} diff --git a/zed/src/rpc.rs b/zed/src/rpc.rs index 7c1640ce82c809ba90a092cfd233d8713813ee53..c987e34b1657cfb2fe23e0af54b75b7a9792078b 100644 --- a/zed/src/rpc.rs +++ b/zed/src/rpc.rs @@ -2,6 +2,7 @@ use crate::{language::LanguageRegistry, worktree::Worktree}; use anyhow::{anyhow, Context, Result}; use async_tungstenite::tungstenite::http::Request; use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage}; +use futures::Stream; use gpui::{AsyncAppContext, ModelHandle, Task, WeakModelHandle}; use lazy_static::lazy_static; use smol::lock::RwLock; @@ -29,7 +30,6 @@ pub struct Client { pub struct ClientState { connection_id: Option, pub shared_worktrees: HashMap>, - pub channel_list: Option>, pub languages: Arc, } @@ -82,6 +82,10 @@ impl Client { }); } + pub fn subscribe(&self) -> impl Stream>> { + self.peer.subscribe() + } + pub async fn log_in_and_connect( &self, router: Arc, diff --git a/zrpc/src/peer.rs b/zrpc/src/peer.rs index 2093d6a73215a5463502b28935a258ddb3de1751..37a8bc3efbb90a93e9d5d3f2d9b683a9545ca162 100644 --- a/zrpc/src/peer.rs +++ b/zrpc/src/peer.rs @@ -3,15 +3,15 @@ use anyhow::{anyhow, Context, Result}; use async_lock::{Mutex, RwLock}; use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage}; use futures::{ - future::{BoxFuture, LocalBoxFuture}, - FutureExt, StreamExt, + future::{self, BoxFuture, LocalBoxFuture}, + FutureExt, Stream, StreamExt, }; use postage::{ - mpsc, - prelude::{Sink, Stream}, + broadcast, mpsc, + prelude::{Sink as _, Stream as _}, }; use std::{ - any::TypeId, + any::{Any, TypeId}, collections::{HashMap, HashSet}, fmt, future::Future, @@ -77,6 +77,7 @@ pub struct RouterInternal { pub struct Peer { connections: RwLock>, next_connection_id: AtomicU32, + incoming_messages: broadcast::Sender>, } #[derive(Clone)] @@ -91,6 +92,7 @@ impl Peer { Arc::new(Self { connections: Default::default(), next_connection_id: Default::default(), + incoming_messages: broadcast::channel(256).0, }) } @@ -189,6 +191,12 @@ impl Peer { self.connections.write().await.clear(); } + pub fn subscribe(&self) -> impl Stream>> { + self.incoming_messages + .subscribe() + .filter_map(|envelope| future::ready(Arc::downcast(envelope).ok())) + } + pub fn request( self: &Arc, receiver_id: ConnectionId, diff --git a/zrpc/src/proto.rs b/zrpc/src/proto.rs index 77390cbb175b062aa2ae5b8b6cc74e0dcda74172..9d44b4da384d89a29ed1be827cd214f981f87889 100644 --- a/zrpc/src/proto.rs +++ b/zrpc/src/proto.rs @@ -8,7 +8,7 @@ use std::{ include!(concat!(env!("OUT_DIR"), "/zed.messages.rs")); -pub trait EnvelopedMessage: Clone + Sized + Send + 'static { +pub trait EnvelopedMessage: Clone + Sized + Send + Sync + 'static { const NAME: &'static str; fn into_envelope( self,