Leave room when client is the only participant

Antonio Scandurra created

Change summary

crates/call/src/call.rs                | 37 ++++++++-------
crates/call/src/room.rs                | 66 ++++++++++++++++++++++++---
crates/collab/src/integration_tests.rs |  2 
3 files changed, 78 insertions(+), 27 deletions(-)

Detailed changes

crates/call/src/call.rs 🔗

@@ -109,31 +109,32 @@ impl ActiveCall {
         initial_project: Option<ModelHandle<Project>>,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<()>> {
-        let room = self.room.as_ref().map(|(room, _)| room.clone());
         let client = self.client.clone();
         let user_store = self.user_store.clone();
         cx.spawn(|this, mut cx| async move {
-            let room = if let Some(room) = room {
-                room
-            } else {
-                cx.update(|cx| Room::create(client, user_store, cx)).await?
-            };
+            if let Some(room) = this.read_with(&cx, |this, _| this.room().cloned()) {
+                let initial_project_id = if let Some(initial_project) = initial_project {
+                    Some(
+                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
+                            .await?,
+                    )
+                } else {
+                    None
+                };
 
-            let initial_project_id = if let Some(initial_project) = initial_project {
-                Some(
-                    room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
-                        .await?,
-                )
+                room.update(&mut cx, |room, cx| {
+                    room.call(recipient_user_id, initial_project_id, cx)
+                })
+                .await?;
             } else {
-                None
+                let room = cx
+                    .update(|cx| {
+                        Room::create(recipient_user_id, initial_project, client, user_store, cx)
+                    })
+                    .await?;
+                this.update(&mut cx, |this, cx| this.set_room(Some(room), cx));
             };
 
-            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx));
-            room.update(&mut cx, |room, cx| {
-                room.call(recipient_user_id, initial_project_id, cx)
-            })
-            .await?;
-
             Ok(())
         })
     }

crates/call/src/room.rs 🔗

@@ -21,9 +21,10 @@ pub struct Room {
     status: RoomStatus,
     remote_participants: HashMap<PeerId, RemoteParticipant>,
     pending_users: Vec<Arc<User>>,
+    pending_call_count: usize,
     client: Arc<Client>,
     user_store: ModelHandle<UserStore>,
-    _subscriptions: Vec<client::Subscription>,
+    subscriptions: Vec<client::Subscription>,
     _pending_room_update: Option<Task<()>>,
 }
 
@@ -62,7 +63,8 @@ impl Room {
             status: RoomStatus::Online,
             remote_participants: Default::default(),
             pending_users: Default::default(),
-            _subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
+            pending_call_count: 0,
+            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
             _pending_room_update: None,
             client,
             user_store,
@@ -70,13 +72,40 @@ impl Room {
     }
 
     pub(crate) fn create(
+        recipient_user_id: u64,
+        initial_project: Option<ModelHandle<Project>>,
         client: Arc<Client>,
         user_store: ModelHandle<UserStore>,
         cx: &mut MutableAppContext,
     ) -> Task<Result<ModelHandle<Self>>> {
         cx.spawn(|mut cx| async move {
-            let room = client.request(proto::CreateRoom {}).await?;
-            Ok(cx.add_model(|cx| Self::new(room.id, client, user_store, cx)))
+            let response = client.request(proto::CreateRoom {}).await?;
+            let room = cx.add_model(|cx| Self::new(response.id, client, user_store, cx));
+            let initial_project_id = if let Some(initial_project) = initial_project {
+                let initial_project_id = room
+                    .update(&mut cx, |room, cx| {
+                        room.share_project(initial_project.clone(), cx)
+                    })
+                    .await?;
+                initial_project
+                    .update(&mut cx, |project, cx| {
+                        project.shared(initial_project_id, cx)
+                    })
+                    .await?;
+                Some(initial_project_id)
+            } else {
+                None
+            };
+
+            match room
+                .update(&mut cx, |room, cx| {
+                    room.call(recipient_user_id, initial_project_id, cx)
+                })
+                .await
+            {
+                Ok(()) => Ok(room),
+                Err(_) => Err(anyhow!("call failed")),
+            }
         })
     }
 
@@ -96,6 +125,12 @@ impl Room {
         })
     }
 
+    fn should_leave(&self) -> bool {
+        self.pending_users.is_empty()
+            && self.remote_participants.is_empty()
+            && self.pending_call_count == 0
+    }
+
     pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
         if self.status.is_offline() {
             return Err(anyhow!("room is offline"));
@@ -104,6 +139,7 @@ impl Room {
         cx.notify();
         self.status = RoomStatus::Offline;
         self.remote_participants.clear();
+        self.subscriptions.clear();
         self.client.send(proto::LeaveRoom { id: self.id })?;
         Ok(())
     }
@@ -134,8 +170,7 @@ impl Room {
             .payload
             .room
             .ok_or_else(|| anyhow!("invalid room"))?;
-        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?;
-        Ok(())
+        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
     }
 
     fn apply_room_update(
@@ -209,6 +244,10 @@ impl Room {
                     this.pending_users = pending_users;
                     cx.notify();
                 }
+
+                if this.should_leave() {
+                    let _ = this.leave(cx);
+                }
             });
         }));
 
@@ -226,16 +265,25 @@ impl Room {
             return Task::ready(Err(anyhow!("room is offline")));
         }
 
+        cx.notify();
         let client = self.client.clone();
         let room_id = self.id;
-        cx.foreground().spawn(async move {
-            client
+        self.pending_call_count += 1;
+        cx.spawn(|this, mut cx| async move {
+            let result = client
                 .request(proto::Call {
                     room_id,
                     recipient_user_id,
                     initial_project_id,
                 })
-                .await?;
+                .await;
+            this.update(&mut cx, |this, cx| {
+                this.pending_call_count -= 1;
+                if this.should_leave() {
+                    this.leave(cx)?;
+                }
+                result
+            })?;
             Ok(())
         })
     }

crates/collab/src/integration_tests.rs 🔗

@@ -383,9 +383,11 @@ async fn test_leaving_room_on_disconnection(
         }
     );
 
+    // When user A disconnects, both client A and B clear their room on the active call.
     server.disconnect_client(client_a.current_user_id(cx_a));
     cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
     active_call_a.read_with(cx_a, |call, _| assert!(call.room().is_none()));
+    active_call_b.read_with(cx_b, |call, _| assert!(call.room().is_none()));
     assert_eq!(
         room_participants(&room_a, cx_a),
         RoomParticipants {