Leave room when `Room` entity is dropped

Antonio Scandurra created

Change summary

crates/collab/src/integration_tests.rs | 21 ++++++++++++--
crates/collab/src/rpc.rs               |  9 ++++++
crates/collab/src/rpc/store.rs         | 41 +++++++++++++++++++++++++--
crates/room/src/room.rs                | 32 ++++++++++++--------
crates/rpc/proto/zed.proto             |  5 +++
crates/rpc/src/proto.rs                |  1 
6 files changed, 89 insertions(+), 20 deletions(-)

Detailed changes

crates/collab/src/integration_tests.rs 🔗

@@ -62,7 +62,7 @@ fn init_logger() {
 }
 
 #[gpui::test(iterations = 10)]
-async fn test_share_project_in_room(
+async fn test_basic_calls(
     deterministic: Arc<Deterministic>,
     cx_a: &mut TestAppContext,
     cx_b: &mut TestAppContext,
@@ -111,6 +111,7 @@ async fn test_share_project_in_room(
     let (project_a, worktree_id) = client_a.build_local_project("/a", cx_a).await;
     // room.publish_project(project_a.clone()).await.unwrap();
 
+    // Call user B from client A.
     let mut incoming_call_b = client_b
         .user_store
         .update(cx_b, |user, _| user.incoming_call());
@@ -128,6 +129,7 @@ async fn test_share_project_in_room(
         }
     );
 
+    // User B receives the call and joins the room.
     let call_b = incoming_call_b.next().await.unwrap().unwrap();
     let room_b = cx_b
         .update(|cx| Room::join(&call_b, client_b.clone(), cx))
@@ -151,11 +153,12 @@ async fn test_share_project_in_room(
         }
     );
 
+    // Call user C from client B.
     let mut incoming_call_c = client_c
         .user_store
         .update(cx_c, |user, _| user.incoming_call());
-    room_a
-        .update(cx_a, |room, cx| room.call(client_c.user_id().unwrap(), cx))
+    room_b
+        .update(cx_b, |room, cx| room.call(client_c.user_id().unwrap(), cx))
         .await
         .unwrap();
 
@@ -175,6 +178,7 @@ async fn test_share_project_in_room(
         }
     );
 
+    // User C receives the call, but declines it.
     let _call_c = incoming_call_c.next().await.unwrap().unwrap();
     client_c
         .user_store
@@ -198,6 +202,17 @@ async fn test_share_project_in_room(
         }
     );
 
+    // User A leaves the room.
+    cx_a.update(|_| drop(room_a));
+    deterministic.run_until_parked();
+    assert_eq!(
+        participants(&room_b, &client_b, cx_b).await,
+        RoomParticipants {
+            remote: Default::default(),
+            pending: Default::default()
+        }
+    );
+
     #[derive(Debug, Eq, PartialEq)]
     struct RoomParticipants {
         remote: Vec<String>,

crates/collab/src/rpc.rs 🔗

@@ -153,6 +153,7 @@ impl Server {
             .add_request_handler(Server::ping)
             .add_request_handler(Server::create_room)
             .add_request_handler(Server::join_room)
+            .add_message_handler(Server::leave_room)
             .add_request_handler(Server::call)
             .add_message_handler(Server::decline_call)
             .add_request_handler(Server::register_project)
@@ -627,6 +628,14 @@ impl Server {
         Ok(())
     }
 
+    async fn leave_room(self: Arc<Server>, message: TypedEnvelope<proto::LeaveRoom>) -> Result<()> {
+        let room_id = message.payload.id;
+        let mut store = self.store().await;
+        let room = store.leave_room(room_id, message.sender_id)?;
+        self.room_updated(room);
+        Ok(())
+    }
+
     async fn call(
         self: Arc<Server>,
         request: TypedEnvelope<proto::Call>,

crates/collab/src/rpc/store.rs 🔗

@@ -38,7 +38,7 @@ struct ConnectionState {
 
 #[derive(Copy, Clone, Eq, PartialEq, Serialize)]
 enum RoomState {
-    Joined,
+    Joined { room_id: RoomId },
     Calling { room_id: RoomId },
 }
 
@@ -370,13 +370,13 @@ impl Store {
 
         let room_id = post_inc(&mut self.next_room_id);
         self.rooms.insert(room_id, room);
-        user_connection_state.room = Some(RoomState::Joined);
+        user_connection_state.room = Some(RoomState::Joined { room_id });
         Ok(room_id)
     }
 
     pub fn join_room(
         &mut self,
-        room_id: u64,
+        room_id: RoomId,
         connection_id: ConnectionId,
     ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
         let connection = self
@@ -417,11 +417,44 @@ impl Store {
                 )),
             }),
         });
-        user_connection_state.room = Some(RoomState::Joined);
+        user_connection_state.room = Some(RoomState::Joined { room_id });
 
         Ok((room, recipient_connection_ids))
     }
 
+    pub fn leave_room(
+        &mut self,
+        room_id: RoomId,
+        connection_id: ConnectionId,
+    ) -> Result<&proto::Room> {
+        let connection = self
+            .connections
+            .get_mut(&connection_id)
+            .ok_or_else(|| anyhow!("no such connection"))?;
+        let user_id = connection.user_id;
+
+        let mut user_connection_state = self
+            .connections_by_user_id
+            .get_mut(&user_id)
+            .ok_or_else(|| anyhow!("no such connection"))?;
+        anyhow::ensure!(
+            user_connection_state
+                .room
+                .map_or(false, |room| room == RoomState::Joined { room_id }),
+            "cannot leave a room before joining it"
+        );
+
+        let room = self
+            .rooms
+            .get_mut(&room_id)
+            .ok_or_else(|| anyhow!("no such room"))?;
+        room.participants
+            .retain(|participant| participant.peer_id != connection_id.0);
+        user_connection_state.room = None;
+
+        Ok(room)
+    }
+
     pub fn call(
         &mut self,
         room_id: RoomId,

crates/room/src/room.rs 🔗

@@ -31,6 +31,19 @@ impl Entity for Room {
 }
 
 impl Room {
+    fn new(id: u64, client: Arc<Client>, cx: &mut ModelContext<Self>) -> Self {
+        Self {
+            id,
+            local_participant: LocalParticipant {
+                projects: Default::default(),
+            },
+            remote_participants: Default::default(),
+            pending_user_ids: Default::default(),
+            _subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
+            client,
+        }
+    }
+
     pub fn create(
         client: Arc<Client>,
         cx: &mut MutableAppContext,
@@ -56,19 +69,6 @@ impl Room {
         })
     }
 
-    fn new(id: u64, client: Arc<Client>, cx: &mut ModelContext<Self>) -> Self {
-        Self {
-            id,
-            local_participant: LocalParticipant {
-                projects: Default::default(),
-            },
-            remote_participants: Default::default(),
-            pending_user_ids: Default::default(),
-            _subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
-            client,
-        }
-    }
-
     pub fn remote_participants(&self) -> &HashMap<PeerId, RemoteParticipant> {
         &self.remote_participants
     }
@@ -148,3 +148,9 @@ impl Room {
         todo!()
     }
 }
+
+impl Drop for Room {
+    fn drop(&mut self) {
+        let _ = self.client.send(proto::LeaveRoom { id: self.id });
+    }
+}

crates/rpc/proto/zed.proto 🔗

@@ -15,6 +15,7 @@ message Envelope {
         CreateRoomResponse create_room_response = 9;
         JoinRoom join_room = 10;
         JoinRoomResponse join_room_response = 11;
+        LeaveRoom leave_room = 1002;
         Call call = 12;
         IncomingCall incoming_call = 1000;
         CancelCall cancel_call = 1001;
@@ -149,6 +150,10 @@ message JoinRoomResponse {
     Room room = 1;
 }
 
+message LeaveRoom {
+    uint64 id = 1;
+}
+
 message Room {
     repeated Participant participants = 1;
     repeated uint64 pending_user_ids = 2;

crates/rpc/src/proto.rs 🔗

@@ -131,6 +131,7 @@ messages!(
     (JoinRoomResponse, Foreground),
     (LeaveChannel, Foreground),
     (LeaveProject, Foreground),
+    (LeaveRoom, Foreground),
     (OpenBufferById, Background),
     (OpenBufferByPath, Background),
     (OpenBufferForSymbol, Background),