Merge pull request #2274 from zed-industries/leave-on-quit

Antonio Scandurra created

Leave room on quit

Change summary

crates/call/src/call.rs                                 |  9 +
crates/call/src/room.rs                                 | 51 ++++++++--
crates/collab/src/rpc.rs                                | 12 +
crates/collab/src/tests/integration_tests.rs            | 39 ++++++-
crates/collab/src/tests/randomized_integration_tests.rs |  2 
crates/collab_ui/src/collab_titlebar_item.rs            |  2 
crates/gpui/src/app.rs                                  | 23 ++--
crates/rpc/src/proto.rs                                 |  1 
crates/rpc/src/rpc.rs                                   |  2 
crates/workspace/src/workspace.rs                       |  5 
10 files changed, 103 insertions(+), 43 deletions(-)

Detailed changes

crates/call/src/call.rs 🔗

@@ -264,12 +264,13 @@ impl ActiveCall {
         Ok(())
     }
 
-    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
+    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
+        cx.notify();
         if let Some((room, _)) = self.room.take() {
-            room.update(cx, |room, cx| room.leave(cx))?;
-            cx.notify();
+            room.update(cx, |room, cx| room.leave(cx))
+        } else {
+            Task::ready(Ok(()))
         }
-        Ok(())
     }
 
     pub fn share_project(

crates/call/src/room.rs 🔗

@@ -17,7 +17,7 @@ use language::LanguageRegistry;
 use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
 use postage::stream::Stream;
 use project::Project;
-use std::{mem, sync::Arc, time::Duration};
+use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
 use util::{post_inc, ResultExt, TryFutureExt};
 
 pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
@@ -64,10 +64,27 @@ pub struct Room {
 impl Entity for Room {
     type Event = Event;
 
-    fn release(&mut self, _: &mut MutableAppContext) {
+    fn release(&mut self, cx: &mut MutableAppContext) {
         if self.status.is_online() {
-            log::info!("room was released, sending leave message");
-            let _ = self.client.send(proto::LeaveRoom {});
+            self.leave_internal(cx).detach_and_log_err(cx);
+        }
+    }
+
+    fn app_will_quit(
+        &mut self,
+        cx: &mut MutableAppContext,
+    ) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
+        if self.status.is_online() {
+            let leave = self.leave_internal(cx);
+            Some(
+                cx.background()
+                    .spawn(async move {
+                        leave.await.log_err();
+                    })
+                    .boxed(),
+            )
+        } else {
+            None
         }
     }
 }
@@ -234,13 +251,17 @@ impl Room {
             && self.pending_call_count == 0
     }
 
-    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
+    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
+        cx.notify();
+        cx.emit(Event::Left);
+        self.leave_internal(cx)
+    }
+
+    fn leave_internal(&mut self, cx: &mut MutableAppContext) -> Task<Result<()>> {
         if self.status.is_offline() {
-            return Err(anyhow!("room is offline"));
+            return Task::ready(Err(anyhow!("room is offline")));
         }
 
-        cx.notify();
-        cx.emit(Event::Left);
         log::info!("leaving room");
 
         for project in self.shared_projects.drain() {
@@ -266,8 +287,12 @@ impl Room {
         self.live_kit.take();
         self.pending_room_update.take();
         self.maintain_connection.take();
-        self.client.send(proto::LeaveRoom {})?;
-        Ok(())
+
+        let leave_room = self.client.request(proto::LeaveRoom {});
+        cx.background().spawn(async move {
+            leave_room.await?;
+            anyhow::Ok(())
+        })
     }
 
     async fn maintain_connection(
@@ -758,10 +783,10 @@ impl Room {
             this.update(&mut cx, |this, cx| {
                 this.pending_call_count -= 1;
                 if this.should_leave() {
-                    this.leave(cx)?;
+                    this.leave(cx).detach_and_log_err(cx);
                 }
-                result
-            })?;
+            });
+            result?;
             Ok(())
         })
     }

crates/collab/src/rpc.rs 🔗

@@ -186,7 +186,7 @@ impl Server {
             .add_request_handler(create_room)
             .add_request_handler(join_room)
             .add_request_handler(rejoin_room)
-            .add_message_handler(leave_room)
+            .add_request_handler(leave_room)
             .add_request_handler(call)
             .add_request_handler(cancel_call)
             .add_message_handler(decline_call)
@@ -1102,8 +1102,14 @@ async fn rejoin_room(
     Ok(())
 }
 
-async fn leave_room(_message: proto::LeaveRoom, session: Session) -> Result<()> {
-    leave_room_for_session(&session).await
+async fn leave_room(
+    _: proto::LeaveRoom,
+    response: Response<proto::LeaveRoom>,
+    session: Session,
+) -> Result<()> {
+    leave_room_for_session(&session).await?;
+    response.send(proto::Ack {})?;
+    Ok(())
 }
 
 async fn call(

crates/collab/src/tests/integration_tests.rs 🔗

@@ -274,10 +274,14 @@ async fn test_basic_calls(
     }
 
     // User A leaves the room.
-    active_call_a.update(cx_a, |call, cx| {
-        call.hang_up(cx).unwrap();
-        assert!(call.room().is_none());
-    });
+    active_call_a
+        .update(cx_a, |call, cx| {
+            let hang_up = call.hang_up(cx);
+            assert!(call.room().is_none());
+            hang_up
+        })
+        .await
+        .unwrap();
     deterministic.run_until_parked();
     assert_eq!(
         room_participants(&room_a, cx_a),
@@ -557,6 +561,7 @@ async fn test_room_uniqueness(
     // Client C can successfully call client B after client B leaves the room.
     active_call_b
         .update(cx_b, |call, cx| call.hang_up(cx))
+        .await
         .unwrap();
     deterministic.run_until_parked();
     active_call_c
@@ -936,6 +941,7 @@ async fn test_server_restarts(
     // User D hangs up.
     active_call_d
         .update(cx_d, |call, cx| call.hang_up(cx))
+        .await
         .unwrap();
     deterministic.run_until_parked();
     assert_eq!(
@@ -1099,7 +1105,10 @@ async fn test_calls_on_multiple_connections(
         .unwrap();
 
     // User B hangs up, and user A calls them again.
-    active_call_b2.update(cx_b2, |call, cx| call.hang_up(cx).unwrap());
+    active_call_b2
+        .update(cx_b2, |call, cx| call.hang_up(cx))
+        .await
+        .unwrap();
     deterministic.run_until_parked();
     active_call_a
         .update(cx_a, |call, cx| {
@@ -1134,7 +1143,10 @@ async fn test_calls_on_multiple_connections(
     assert!(incoming_call_b2.next().await.unwrap().is_some());
 
     // User A hangs up, causing both connections to stop ringing.
-    active_call_a.update(cx_a, |call, cx| call.hang_up(cx).unwrap());
+    active_call_a
+        .update(cx_a, |call, cx| call.hang_up(cx))
+        .await
+        .unwrap();
     deterministic.run_until_parked();
     assert!(incoming_call_b1.next().await.unwrap().is_none());
     assert!(incoming_call_b2.next().await.unwrap().is_none());
@@ -1371,7 +1383,10 @@ async fn test_unshare_project(
         .unwrap();
 
     // When client B leaves the room, the project becomes read-only.
-    active_call_b.update(cx_b, |call, cx| call.hang_up(cx).unwrap());
+    active_call_b
+        .update(cx_b, |call, cx| call.hang_up(cx))
+        .await
+        .unwrap();
     deterministic.run_until_parked();
     assert!(project_b.read_with(cx_b, |project, _| project.is_read_only()));
 
@@ -1400,7 +1415,10 @@ async fn test_unshare_project(
         .unwrap();
 
     // When client A (the host) leaves the room, the project gets unshared and guests are notified.
-    active_call_a.update(cx_a, |call, cx| call.hang_up(cx).unwrap());
+    active_call_a
+        .update(cx_a, |call, cx| call.hang_up(cx))
+        .await
+        .unwrap();
     deterministic.run_until_parked();
     project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
     project_c2.read_with(cx_c, |project, _| {
@@ -5455,7 +5473,10 @@ async fn test_contacts(
         [("user_b".to_string(), "online", "busy")]
     );
 
-    active_call_a.update(cx_a, |call, cx| call.hang_up(cx).unwrap());
+    active_call_a
+        .update(cx_a, |call, cx| call.hang_up(cx))
+        .await
+        .unwrap();
     deterministic.run_until_parked();
     assert_eq!(
         contacts(&client_a, cx_a),

crates/collab/src/tests/randomized_integration_tests.rs 🔗

@@ -641,7 +641,7 @@ async fn randomly_mutate_active_call(
                 if can_hang_up && active_call.read_with(cx, |call, _| call.room().is_some()) =>
             {
                 log::info!("{}: hanging up", client.username);
-                active_call.update(cx, |call, cx| call.hang_up(cx))?;
+                active_call.update(cx, |call, cx| call.hang_up(cx)).await?;
             }
             _ => {}
         }

crates/collab_ui/src/collab_titlebar_item.rs 🔗

@@ -342,7 +342,7 @@ impl CollabTitlebarItem {
     fn leave_call(&mut self, _: &LeaveCall, cx: &mut ViewContext<Self>) {
         ActiveCall::global(cx)
             .update(cx, |call, cx| call.hang_up(cx))
-            .log_err();
+            .detach_and_log_err(cx);
     }
 
     fn render_toggle_contacts_button(

crates/gpui/src/app.rs 🔗

@@ -588,17 +588,20 @@ impl MutableAppContext {
 
     pub fn quit(&mut self) {
         let mut futures = Vec::new();
-        for model_id in self.cx.models.keys().copied().collect::<Vec<_>>() {
-            let mut model = self.cx.models.remove(&model_id).unwrap();
-            futures.extend(model.app_will_quit(self));
-            self.cx.models.insert(model_id, model);
-        }
 
-        for view_id in self.cx.views.keys().copied().collect::<Vec<_>>() {
-            let mut view = self.cx.views.remove(&view_id).unwrap();
-            futures.extend(view.app_will_quit(self));
-            self.cx.views.insert(view_id, view);
-        }
+        self.update(|cx| {
+            for model_id in cx.models.keys().copied().collect::<Vec<_>>() {
+                let mut model = cx.cx.models.remove(&model_id).unwrap();
+                futures.extend(model.app_will_quit(cx));
+                cx.cx.models.insert(model_id, model);
+            }
+
+            for view_id in cx.views.keys().copied().collect::<Vec<_>>() {
+                let mut view = cx.cx.views.remove(&view_id).unwrap();
+                futures.extend(view.app_will_quit(cx));
+                cx.cx.views.insert(view_id, view);
+            }
+        });
 
         self.remove_all_windows();
 

crates/rpc/src/proto.rs 🔗

@@ -269,6 +269,7 @@ request_messages!(
     (JoinChannel, JoinChannelResponse),
     (JoinProject, JoinProjectResponse),
     (JoinRoom, JoinRoomResponse),
+    (LeaveRoom, Ack),
     (RejoinRoom, RejoinRoomResponse),
     (IncomingCall, Ack),
     (OpenBufferById, OpenBufferResponse),

crates/rpc/src/rpc.rs 🔗

@@ -6,4 +6,4 @@ pub use conn::Connection;
 pub use peer::*;
 mod macros;
 
-pub const PROTOCOL_VERSION: u32 = 49;
+pub const PROTOCOL_VERSION: u32 = 50;

crates/workspace/src/workspace.rs 🔗

@@ -1064,7 +1064,10 @@ impl Workspace {
                     if answer == Some(1) {
                         return anyhow::Ok(false);
                     } else {
-                        active_call.update(&mut cx, |call, cx| call.hang_up(cx))?;
+                        active_call
+                            .update(&mut cx, |call, cx| call.hang_up(cx))
+                            .await
+                            .log_err();
                     }
                 }
             }