Return error if subscribing to an entity that was already subscribed to

Antonio Scandurra created

Change summary

crates/client/src/client.rs   | 27 +++++++++++++++++----------
crates/project/src/project.rs | 13 ++++++-------
2 files changed, 23 insertions(+), 17 deletions(-)

Detailed changes

crates/client/src/client.rs 🔗

@@ -473,18 +473,22 @@ impl Client {
     pub fn subscribe_to_entity<T: Entity>(
         self: &Arc<Self>,
         remote_id: u64,
-    ) -> PendingEntitySubscription<T> {
+    ) -> Result<PendingEntitySubscription<T>> {
         let id = (TypeId::of::<T>(), remote_id);
-        self.state
-            .write()
-            .entities_by_type_and_remote_id
-            .insert(id, WeakSubscriber::Pending(Default::default()));
 
-        PendingEntitySubscription {
-            client: self.clone(),
-            remote_id,
-            consumed: false,
-            _entity_type: PhantomData,
+        let mut state = self.state.write();
+        if state.entities_by_type_and_remote_id.contains_key(&id) {
+            return Err(anyhow!("already subscribed to entity"));
+        } else {
+            state
+                .entities_by_type_and_remote_id
+                .insert(id, WeakSubscriber::Pending(Default::default()));
+            Ok(PendingEntitySubscription {
+                client: self.clone(),
+                remote_id,
+                consumed: false,
+                _entity_type: PhantomData,
+            })
         }
     }
 
@@ -1605,14 +1609,17 @@ mod tests {
 
         let _subscription1 = client
             .subscribe_to_entity(1)
+            .unwrap()
             .set_model(&model1, &mut cx.to_async());
         let _subscription2 = client
             .subscribe_to_entity(2)
+            .unwrap()
             .set_model(&model2, &mut cx.to_async());
         // Ensure dropping a subscription for the same entity type still allows receiving of
         // messages for other entity IDs of the same type.
         let subscription3 = client
             .subscribe_to_entity(3)
+            .unwrap()
             .set_model(&model3, &mut cx.to_async());
         drop(subscription3);
 

crates/project/src/project.rs 🔗

@@ -463,7 +463,7 @@ impl Project {
     ) -> Result<ModelHandle<Self>> {
         client.authenticate_and_connect(true, &cx).await?;
 
-        let subscription = client.subscribe_to_entity(remote_id);
+        let subscription = client.subscribe_to_entity(remote_id)?;
         let response = client
             .request_envelope(proto::JoinProject {
                 project_id: remote_id,
@@ -989,6 +989,11 @@ impl Project {
         if self.client_state.is_some() {
             return Err(anyhow!("project was already shared"));
         }
+        self.client_subscriptions.push(
+            self.client
+                .subscribe_to_entity(project_id)?
+                .set_model(&cx.handle(), &mut cx.to_async()),
+        );
 
         for open_buffer in self.opened_buffers.values_mut() {
             match open_buffer {
@@ -1025,12 +1030,6 @@ impl Project {
                 .log_err();
         }
 
-        self.client_subscriptions.push(
-            self.client
-                .subscribe_to_entity(project_id)
-                .set_model(&cx.handle(), &mut cx.to_async()),
-        );
-
         let (metadata_changed_tx, mut metadata_changed_rx) = mpsc::unbounded();
         self.client_state = Some(ProjectClientState::Local {
             remote_id: project_id,