@@ -8,19 +8,23 @@ use anyhow::{anyhow, Result};
use audio::Audio;
use call_settings::CallSettings;
use channel::ChannelId;
-use client::{proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore};
+use client::{
+ proto::{self, PeerId},
+ ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore,
+};
use collections::HashSet;
use futures::{future::Shared, FutureExt};
use postage::watch;
use gpui::{
- AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task,
- WeakModelHandle,
+ AnyViewHandle, AnyWeakViewHandle, AppContext, AsyncAppContext, Entity, ModelContext,
+ ModelHandle, Subscription, Task, ViewContext, WeakModelHandle,
};
use project::Project;
pub use participant::ParticipantLocation;
pub use room::Room;
+use util::ResultExt;
pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
settings::register::<CallSettings>(cx);
@@ -49,9 +53,25 @@ pub struct ActiveCall {
),
client: Arc<Client>,
user_store: ModelHandle<UserStore>,
+ follow_handlers: Vec<FollowHandler>,
+ followers: Vec<Follower>,
_subscriptions: Vec<client::Subscription>,
}
+#[derive(PartialEq, Eq, PartialOrd, Ord)]
+struct Follower {
+ project_id: Option<u64>,
+ peer_id: PeerId,
+}
+
+struct FollowHandler {
+ project_id: Option<u64>,
+ root_view: AnyWeakViewHandle,
+ get_views:
+ Box<dyn Fn(&AnyViewHandle, Option<u64>, &mut AppContext) -> Option<proto::FollowResponse>>,
+ update_view: Box<dyn Fn(&AnyViewHandle, PeerId, proto::UpdateFollowers, &mut AppContext)>,
+}
+
impl Entity for ActiveCall {
type Event = room::Event;
}
@@ -68,9 +88,14 @@ impl ActiveCall {
location: None,
pending_invites: Default::default(),
incoming_call: watch::channel(),
+ follow_handlers: Default::default(),
+ followers: Default::default(),
_subscriptions: vec![
client.add_request_handler(cx.handle(), Self::handle_incoming_call),
client.add_message_handler(cx.handle(), Self::handle_call_canceled),
+ client.add_request_handler(cx.handle(), Self::handle_follow),
+ client.add_message_handler(cx.handle(), Self::handle_unfollow),
+ client.add_message_handler(cx.handle(), Self::handle_update_followers),
],
client,
user_store,
@@ -81,6 +106,48 @@ impl ActiveCall {
self.room()?.read(cx).channel_id()
}
+ pub fn add_follow_handler<V: gpui::View, GetViews, UpdateView>(
+ &mut self,
+ root_view: gpui::ViewHandle<V>,
+ project_id: Option<u64>,
+ get_views: GetViews,
+ update_view: UpdateView,
+ _cx: &mut ModelContext<Self>,
+ ) where
+ GetViews: 'static
+ + Fn(&mut V, Option<u64>, &mut gpui::ViewContext<V>) -> Result<proto::FollowResponse>,
+ UpdateView:
+ 'static + Fn(&mut V, PeerId, proto::UpdateFollowers, &mut ViewContext<V>) -> Result<()>,
+ {
+ self.follow_handlers
+ .retain(|h| h.root_view.id() != root_view.id());
+ if let Err(ix) = self
+ .follow_handlers
+ .binary_search_by_key(&(project_id, root_view.id()), |f| {
+ (f.project_id, f.root_view.id())
+ })
+ {
+ self.follow_handlers.insert(
+ ix,
+ FollowHandler {
+ project_id,
+ root_view: root_view.into_any().downgrade(),
+ get_views: Box::new(move |view, project_id, cx| {
+ let view = view.clone().downcast::<V>().unwrap();
+ view.update(cx, |view, cx| get_views(view, project_id, cx).log_err())
+ .flatten()
+ }),
+ update_view: Box::new(move |view, leader_id, message, cx| {
+ let view = view.clone().downcast::<V>().unwrap();
+ view.update(cx, |view, cx| {
+ update_view(view, leader_id, message, cx).log_err()
+ });
+ }),
+ },
+ );
+ }
+ }
+
async fn handle_incoming_call(
this: ModelHandle<Self>,
envelope: TypedEnvelope<proto::IncomingCall>,
@@ -127,6 +194,127 @@ impl ActiveCall {
Ok(())
}
+ async fn handle_follow(
+ this: ModelHandle<Self>,
+ envelope: TypedEnvelope<proto::Follow>,
+ _: Arc<Client>,
+ mut cx: AsyncAppContext,
+ ) -> Result<proto::FollowResponse> {
+ this.update(&mut cx, |this, cx| {
+ let follower = Follower {
+ project_id: envelope.payload.project_id,
+ peer_id: envelope.original_sender_id()?,
+ };
+ let active_project_id = this
+ .location
+ .as_ref()
+ .and_then(|project| project.upgrade(cx)?.read(cx).remote_id());
+
+ let mut response = proto::FollowResponse::default();
+ for handler in &this.follow_handlers {
+ if follower.project_id != handler.project_id && follower.project_id.is_some() {
+ continue;
+ }
+
+ let Some(root_view) = handler.root_view.upgrade(cx) else {
+ continue;
+ };
+
+ let Some(handler_response) =
+ (handler.get_views)(&root_view, follower.project_id, cx)
+ else {
+ continue;
+ };
+
+ if response.views.is_empty() {
+ response.views = handler_response.views;
+ } else {
+ response.views.extend_from_slice(&handler_response.views);
+ }
+
+ if let Some(active_view_id) = handler_response.active_view_id.clone() {
+ if response.active_view_id.is_none() || handler.project_id == active_project_id
+ {
+ response.active_view_id = Some(active_view_id);
+ }
+ }
+ }
+
+ if let Err(ix) = this.followers.binary_search(&follower) {
+ this.followers.insert(ix, follower);
+ }
+
+ Ok(response)
+ })
+ }
+
+ async fn handle_unfollow(
+ this: ModelHandle<Self>,
+ envelope: TypedEnvelope<proto::Unfollow>,
+ _: Arc<Client>,
+ mut cx: AsyncAppContext,
+ ) -> Result<()> {
+ this.update(&mut cx, |this, _| {
+ let follower = Follower {
+ project_id: envelope.payload.project_id,
+ peer_id: envelope.original_sender_id()?,
+ };
+ if let Err(ix) = this.followers.binary_search(&follower) {
+ this.followers.remove(ix);
+ }
+ Ok(())
+ })
+ }
+
+ async fn handle_update_followers(
+ this: ModelHandle<Self>,
+ envelope: TypedEnvelope<proto::UpdateFollowers>,
+ _: Arc<Client>,
+ mut cx: AsyncAppContext,
+ ) -> Result<()> {
+ let leader_id = envelope.original_sender_id()?;
+ let update = envelope.payload;
+ this.update(&mut cx, |this, cx| {
+ for handler in &this.follow_handlers {
+ if update.project_id != handler.project_id && update.project_id.is_some() {
+ continue;
+ }
+ let Some(root_view) = handler.root_view.upgrade(cx) else {
+ continue;
+ };
+ (handler.update_view)(&root_view, leader_id, update.clone(), cx);
+ }
+ Ok(())
+ })
+ }
+
+ pub fn update_followers(
+ &self,
+ project_id: Option<u64>,
+ update: proto::update_followers::Variant,
+ cx: &AppContext,
+ ) -> Option<()> {
+ let room_id = self.room()?.read(cx).id();
+ let follower_ids: Vec<_> = self
+ .followers
+ .iter()
+ .filter_map(|follower| {
+ (follower.project_id == project_id).then_some(follower.peer_id.into())
+ })
+ .collect();
+ if follower_ids.is_empty() {
+ return None;
+ }
+ self.client
+ .send(proto::UpdateFollowers {
+ room_id,
+ project_id,
+ follower_ids,
+ variant: Some(update),
+ })
+ .log_err()
+ }
+
pub fn global(cx: &AppContext) -> ModelHandle<Self> {
cx.global::<ModelHandle<Self>>().clone()
}
@@ -15,7 +15,7 @@ use call::ActiveCall;
use channel::ChannelStore;
use client::{
proto::{self, PeerId},
- Client, TypedEnvelope, UserStore,
+ Client, UserStore,
};
use collections::{hash_map, HashMap, HashSet};
use drag_and_drop::DragAndDrop;
@@ -331,11 +331,6 @@ pub fn init(app_state: Arc<AppState>, cx: &mut AppContext) {
})
.detach();
});
-
- let client = &app_state.client;
- client.add_view_request_handler(Workspace::handle_follow);
- client.add_view_message_handler(Workspace::handle_unfollow);
- client.add_view_message_handler(Workspace::handle_update_followers);
}
type ProjectItemBuilders = HashMap<
@@ -507,7 +502,6 @@ pub enum Event {
pub struct Workspace {
weak_self: WeakViewHandle<Self>,
- remote_entity_subscription: Option<client::Subscription>,
modal: Option<ActiveModal>,
zoomed: Option<AnyWeakViewHandle>,
zoomed_position: Option<DockPosition>,
@@ -523,7 +517,6 @@ pub struct Workspace {
titlebar_item: Option<AnyViewHandle>,
notifications: Vec<(TypeId, usize, Box<dyn NotificationHandle>)>,
project: ModelHandle<Project>,
- leader_state: LeaderState,
follower_states_by_leader: FollowerStatesByLeader,
last_leaders_by_pane: HashMap<WeakViewHandle<Pane>, PeerId>,
window_edited: bool,
@@ -549,11 +542,6 @@ pub struct ViewId {
pub id: u64,
}
-#[derive(Default)]
-struct LeaderState {
- followers: HashSet<PeerId>,
-}
-
type FollowerStatesByLeader = HashMap<PeerId, HashMap<ViewHandle<Pane>, FollowerState>>;
#[derive(Default)]
@@ -737,12 +725,10 @@ impl Workspace {
status_bar,
titlebar_item: None,
notifications: Default::default(),
- remote_entity_subscription: None,
left_dock,
bottom_dock,
right_dock,
project: project.clone(),
- leader_state: Default::default(),
follower_states_by_leader: Default::default(),
last_leaders_by_pane: Default::default(),
window_edited: false,
@@ -2419,19 +2405,21 @@ impl Workspace {
}
fn project_remote_id_changed(&mut self, remote_id: Option<u64>, cx: &mut ViewContext<Self>) {
- if let Some(remote_id) = remote_id {
- self.remote_entity_subscription = Some(
- self.app_state
- .client
- .add_view_for_remote_entity(remote_id, cx),
- );
- } else {
- self.remote_entity_subscription.take();
+ let handle = cx.handle();
+ if let Some(call) = self.active_call() {
+ call.update(cx, |call, cx| {
+ call.add_follow_handler(
+ handle,
+ remote_id,
+ Self::get_views_for_followers,
+ Self::handle_update_followers,
+ cx,
+ );
+ });
}
}
fn collaborator_left(&mut self, peer_id: PeerId, cx: &mut ViewContext<Self>) {
- self.leader_state.followers.remove(&peer_id);
if let Some(states_by_pane) = self.follower_states_by_leader.remove(&peer_id) {
for state in states_by_pane.into_values() {
for item in state.items_by_leader_view_id.into_values() {
@@ -2463,8 +2451,10 @@ impl Workspace {
.insert(pane.clone(), Default::default());
cx.notify();
- let project_id = self.project.read(cx).remote_id()?;
+ let room_id = self.active_call()?.read(cx).room()?.read(cx).id();
+ let project_id = self.project.read(cx).remote_id();
let request = self.app_state.client.request(proto::Follow {
+ room_id,
project_id,
leader_id: Some(leader_id),
});
@@ -2542,15 +2532,16 @@ impl Workspace {
if states_by_pane.is_empty() {
self.follower_states_by_leader.remove(&leader_id);
- if let Some(project_id) = self.project.read(cx).remote_id() {
- self.app_state
- .client
- .send(proto::Unfollow {
- project_id,
- leader_id: Some(leader_id),
- })
- .log_err();
- }
+ let project_id = self.project.read(cx).remote_id();
+ let room_id = self.active_call()?.read(cx).room()?.read(cx).id();
+ self.app_state
+ .client
+ .send(proto::Unfollow {
+ room_id,
+ project_id,
+ leader_id: Some(leader_id),
+ })
+ .log_err();
}
cx.notify();
@@ -2564,10 +2555,6 @@ impl Workspace {
self.follower_states_by_leader.contains_key(&peer_id)
}
- pub fn is_followed_by(&self, peer_id: PeerId) -> bool {
- self.leader_state.followers.contains(&peer_id)
- }
-
fn render_titlebar(&self, theme: &Theme, cx: &mut ViewContext<Self>) -> AnyElement<Self> {
// TODO: There should be a better system in place for this
// (https://github.com/zed-industries/zed/issues/1290)
@@ -2718,80 +2705,56 @@ impl Workspace {
// RPC handlers
- async fn handle_follow(
- this: WeakViewHandle<Self>,
- envelope: TypedEnvelope<proto::Follow>,
- _: Arc<Client>,
- mut cx: AsyncAppContext,
+ fn get_views_for_followers(
+ &mut self,
+ _project_id: Option<u64>,
+ cx: &mut ViewContext<Self>,
) -> Result<proto::FollowResponse> {
- this.update(&mut cx, |this, cx| {
- let client = &this.app_state.client;
- this.leader_state
- .followers
- .insert(envelope.original_sender_id()?);
+ let client = &self.app_state.client;
- let active_view_id = this.active_item(cx).and_then(|i| {
- Some(
- i.to_followable_item_handle(cx)?
- .remote_id(client, cx)?
- .to_proto(),
- )
- });
+ let active_view_id = self.active_item(cx).and_then(|i| {
+ Some(
+ i.to_followable_item_handle(cx)?
+ .remote_id(client, cx)?
+ .to_proto(),
+ )
+ });
- cx.notify();
+ cx.notify();
- Ok(proto::FollowResponse {
- active_view_id,
- views: this
- .panes()
- .iter()
- .flat_map(|pane| {
- let leader_id = this.leader_for_pane(pane);
- pane.read(cx).items().filter_map({
- let cx = &cx;
- move |item| {
- let item = item.to_followable_item_handle(cx)?;
- let id = item.remote_id(client, cx)?.to_proto();
- let variant = item.to_state_proto(cx)?;
- Some(proto::View {
- id: Some(id),
- leader_id,
- variant: Some(variant),
- })
- }
- })
+ Ok(proto::FollowResponse {
+ active_view_id,
+ views: self
+ .panes()
+ .iter()
+ .flat_map(|pane| {
+ let leader_id = self.leader_for_pane(pane);
+ pane.read(cx).items().filter_map({
+ let cx = &cx;
+ move |item| {
+ let item = item.to_followable_item_handle(cx)?;
+ let id = item.remote_id(client, cx)?.to_proto();
+ let variant = item.to_state_proto(cx)?;
+ Some(proto::View {
+ id: Some(id),
+ leader_id,
+ variant: Some(variant),
+ })
+ }
})
- .collect(),
- })
- })?
- }
-
- async fn handle_unfollow(
- this: WeakViewHandle<Self>,
- envelope: TypedEnvelope<proto::Unfollow>,
- _: Arc<Client>,
- mut cx: AsyncAppContext,
- ) -> Result<()> {
- this.update(&mut cx, |this, cx| {
- this.leader_state
- .followers
- .remove(&envelope.original_sender_id()?);
- cx.notify();
- Ok(())
- })?
+ })
+ .collect(),
+ })
}
- async fn handle_update_followers(
- this: WeakViewHandle<Self>,
- envelope: TypedEnvelope<proto::UpdateFollowers>,
- _: Arc<Client>,
- cx: AsyncAppContext,
+ fn handle_update_followers(
+ &mut self,
+ leader_id: PeerId,
+ message: proto::UpdateFollowers,
+ _cx: &mut ViewContext<Self>,
) -> Result<()> {
- let leader_id = envelope.original_sender_id()?;
- this.read_with(&cx, |this, _| {
- this.leader_updates_tx
- .unbounded_send((leader_id, envelope.payload))
- })??;
+ self.leader_updates_tx
+ .unbounded_send((leader_id, message))?;
Ok(())
}
@@ -2960,18 +2923,9 @@ impl Workspace {
update: proto::update_followers::Variant,
cx: &AppContext,
) -> Option<()> {
- let project_id = self.project.read(cx).remote_id()?;
- if !self.leader_state.followers.is_empty() {
- self.app_state
- .client
- .send(proto::UpdateFollowers {
- project_id,
- follower_ids: self.leader_state.followers.iter().copied().collect(),
- variant: Some(update),
- })
- .log_err();
- }
- None
+ self.active_call()?
+ .read(cx)
+ .update_followers(self.project.read(cx).remote_id(), update, cx)
}
pub fn leader_for_pane(&self, pane: &ViewHandle<Pane>) -> Option<PeerId> {