@@ -190,7 +190,7 @@ impl Channel {
rpc: Arc<Client>,
cx: &mut ModelContext<Self>,
) -> Self {
- let _subscription = rpc.subscribe_from_model(details.id, cx, Self::handle_message_sent);
+ let _subscription = rpc.subscribe_to_entity(details.id, cx, Self::handle_message_sent);
{
let user_store = user_store.clone();
@@ -230,7 +230,47 @@ impl Client {
}
}
- pub fn subscribe_from_model<T, M, F>(
+ pub fn subscribe<T, M, F>(self: &Arc<Self>, cx: ModelContext<M>, mut handler: F) -> Subscription
+ where
+ T: EnvelopedMessage,
+ M: Entity,
+ F: 'static
+ + Send
+ + Sync
+ + FnMut(&mut M, TypedEnvelope<T>, Arc<Self>, &mut ModelContext<M>) -> Result<()>,
+ {
+ let subscription_id = (TypeId::of::<T>(), Default::default());
+ let client = self.clone();
+ let mut state = self.state.write();
+ let model = cx.handle().downgrade();
+ let prev_extractor = state
+ .entity_id_extractors
+ .insert(subscription_id.0, Box::new(|_| Default::default()));
+ if prev_extractor.is_some() {
+ panic!("registered a handler for the same entity twice")
+ }
+
+ state.model_handlers.insert(
+ subscription_id,
+ Box::new(move |envelope, cx| {
+ if let Some(model) = model.upgrade(cx) {
+ let envelope = envelope.into_any().downcast::<TypedEnvelope<T>>().unwrap();
+ model.update(cx, |model, cx| {
+ if let Err(error) = handler(model, *envelope, client.clone(), cx) {
+ log::error!("error handling message: {}", error)
+ }
+ });
+ }
+ }),
+ );
+
+ Subscription {
+ client: Arc::downgrade(self),
+ id: subscription_id,
+ }
+ }
+
+ pub fn subscribe_to_entity<T, M, F>(
self: &Arc<Self>,
remote_id: u64,
cx: &mut ModelContext<M>,
@@ -211,11 +211,11 @@ impl Worktree {
}
let _subscriptions = vec![
- rpc.subscribe_from_model(remote_id, cx, Self::handle_add_peer),
- rpc.subscribe_from_model(remote_id, cx, Self::handle_remove_peer),
- rpc.subscribe_from_model(remote_id, cx, Self::handle_update),
- rpc.subscribe_from_model(remote_id, cx, Self::handle_update_buffer),
- rpc.subscribe_from_model(remote_id, cx, Self::handle_buffer_saved),
+ rpc.subscribe_to_entity(remote_id, cx, Self::handle_add_peer),
+ rpc.subscribe_to_entity(remote_id, cx, Self::handle_remove_peer),
+ rpc.subscribe_to_entity(remote_id, cx, Self::handle_update),
+ rpc.subscribe_to_entity(remote_id, cx, Self::handle_update_buffer),
+ rpc.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved),
];
Worktree::Remote(RemoteWorktree {
@@ -1070,12 +1070,12 @@ impl LocalWorktree {
this.update(&mut cx, |worktree, cx| {
let _subscriptions = vec![
- rpc.subscribe_from_model(remote_id, cx, Worktree::handle_add_peer),
- rpc.subscribe_from_model(remote_id, cx, Worktree::handle_remove_peer),
- rpc.subscribe_from_model(remote_id, cx, Worktree::handle_open_buffer),
- rpc.subscribe_from_model(remote_id, cx, Worktree::handle_close_buffer),
- rpc.subscribe_from_model(remote_id, cx, Worktree::handle_update_buffer),
- rpc.subscribe_from_model(remote_id, cx, Worktree::handle_save_buffer),
+ rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_add_peer),
+ rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_remove_peer),
+ rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_open_buffer),
+ rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_close_buffer),
+ rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_update_buffer),
+ rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_save_buffer),
];
let worktree = worktree.as_local_mut().unwrap();