From c0fc19988ba5bcbf3d4635214eed55ca67225c77 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 13 Mar 2023 18:08:01 +0100 Subject: [PATCH] Merge pull request #2274 from zed-industries/leave-on-quit Leave room on quit --- crates/call/src/call.rs | 9 ++-- crates/call/src/room.rs | 51 ++++++++++++++----- crates/collab/src/rpc.rs | 12 +++-- crates/collab/src/tests/integration_tests.rs | 39 ++++++++++---- .../src/tests/randomized_integration_tests.rs | 2 +- crates/collab_ui/src/collab_titlebar_item.rs | 2 +- crates/gpui/src/app.rs | 23 +++++---- crates/rpc/src/proto.rs | 1 + crates/rpc/src/rpc.rs | 2 +- crates/workspace/src/workspace.rs | 5 +- 10 files changed, 103 insertions(+), 43 deletions(-) diff --git a/crates/call/src/call.rs b/crates/call/src/call.rs index dfe4f39e0e85bef14a52a22a3c8c0f1b9bfa84b6..f9edddf37433649e340515a03c821572d5c971c9 100644 --- a/crates/call/src/call.rs +++ b/crates/call/src/call.rs @@ -264,12 +264,13 @@ impl ActiveCall { Ok(()) } - pub fn hang_up(&mut self, cx: &mut ModelContext) -> Result<()> { + pub fn hang_up(&mut self, cx: &mut ModelContext) -> Task> { + cx.notify(); if let Some((room, _)) = self.room.take() { - room.update(cx, |room, cx| room.leave(cx))?; - cx.notify(); + room.update(cx, |room, cx| room.leave(cx)) + } else { + Task::ready(Ok(())) } - Ok(()) } pub fn share_project( diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 901ae08fc048cf43c98131566855373d38686225..257ef52c1923232633464f3c4980ef55abc1aa63 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -17,7 +17,7 @@ use language::LanguageRegistry; use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate}; use postage::stream::Stream; use project::Project; -use std::{mem, sync::Arc, time::Duration}; +use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration}; use util::{post_inc, ResultExt, TryFutureExt}; pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30); @@ -64,10 +64,27 @@ pub struct Room { impl Entity for Room { type Event = Event; - fn release(&mut self, _: &mut MutableAppContext) { + fn release(&mut self, cx: &mut MutableAppContext) { if self.status.is_online() { - log::info!("room was released, sending leave message"); - let _ = self.client.send(proto::LeaveRoom {}); + self.leave_internal(cx).detach_and_log_err(cx); + } + } + + fn app_will_quit( + &mut self, + cx: &mut MutableAppContext, + ) -> Option>>> { + if self.status.is_online() { + let leave = self.leave_internal(cx); + Some( + cx.background() + .spawn(async move { + leave.await.log_err(); + }) + .boxed(), + ) + } else { + None } } } @@ -234,13 +251,17 @@ impl Room { && self.pending_call_count == 0 } - pub(crate) fn leave(&mut self, cx: &mut ModelContext) -> Result<()> { + pub(crate) fn leave(&mut self, cx: &mut ModelContext) -> Task> { + cx.notify(); + cx.emit(Event::Left); + self.leave_internal(cx) + } + + fn leave_internal(&mut self, cx: &mut MutableAppContext) -> Task> { if self.status.is_offline() { - return Err(anyhow!("room is offline")); + return Task::ready(Err(anyhow!("room is offline"))); } - cx.notify(); - cx.emit(Event::Left); log::info!("leaving room"); for project in self.shared_projects.drain() { @@ -266,8 +287,12 @@ impl Room { self.live_kit.take(); self.pending_room_update.take(); self.maintain_connection.take(); - self.client.send(proto::LeaveRoom {})?; - Ok(()) + + let leave_room = self.client.request(proto::LeaveRoom {}); + cx.background().spawn(async move { + leave_room.await?; + anyhow::Ok(()) + }) } async fn maintain_connection( @@ -758,10 +783,10 @@ impl Room { this.update(&mut cx, |this, cx| { this.pending_call_count -= 1; if this.should_leave() { - this.leave(cx)?; + this.leave(cx).detach_and_log_err(cx); } - result - })?; + }); + result?; Ok(()) }) } diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index d13487440fe42c56808012345e008109589ea7b2..a1d5e858781e77993a53d68281c43c6ac62a917c 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -186,7 +186,7 @@ impl Server { .add_request_handler(create_room) .add_request_handler(join_room) .add_request_handler(rejoin_room) - .add_message_handler(leave_room) + .add_request_handler(leave_room) .add_request_handler(call) .add_request_handler(cancel_call) .add_message_handler(decline_call) @@ -1093,8 +1093,14 @@ async fn rejoin_room( Ok(()) } -async fn leave_room(_message: proto::LeaveRoom, session: Session) -> Result<()> { - leave_room_for_session(&session).await +async fn leave_room( + _: proto::LeaveRoom, + response: Response, + session: Session, +) -> Result<()> { + leave_room_for_session(&session).await?; + response.send(proto::Ack {})?; + Ok(()) } async fn call( diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index 44a2839f27c750ac795ca6a0aaa936aedbe9ebcf..c48f6b160460594efe615252f9b148f7b187e25d 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -274,10 +274,14 @@ async fn test_basic_calls( } // User A leaves the room. - active_call_a.update(cx_a, |call, cx| { - call.hang_up(cx).unwrap(); - assert!(call.room().is_none()); - }); + active_call_a + .update(cx_a, |call, cx| { + let hang_up = call.hang_up(cx); + assert!(call.room().is_none()); + hang_up + }) + .await + .unwrap(); deterministic.run_until_parked(); assert_eq!( room_participants(&room_a, cx_a), @@ -557,6 +561,7 @@ async fn test_room_uniqueness( // Client C can successfully call client B after client B leaves the room. active_call_b .update(cx_b, |call, cx| call.hang_up(cx)) + .await .unwrap(); deterministic.run_until_parked(); active_call_c @@ -936,6 +941,7 @@ async fn test_server_restarts( // User D hangs up. active_call_d .update(cx_d, |call, cx| call.hang_up(cx)) + .await .unwrap(); deterministic.run_until_parked(); assert_eq!( @@ -1099,7 +1105,10 @@ async fn test_calls_on_multiple_connections( .unwrap(); // User B hangs up, and user A calls them again. - active_call_b2.update(cx_b2, |call, cx| call.hang_up(cx).unwrap()); + active_call_b2 + .update(cx_b2, |call, cx| call.hang_up(cx)) + .await + .unwrap(); deterministic.run_until_parked(); active_call_a .update(cx_a, |call, cx| { @@ -1134,7 +1143,10 @@ async fn test_calls_on_multiple_connections( assert!(incoming_call_b2.next().await.unwrap().is_some()); // User A hangs up, causing both connections to stop ringing. - active_call_a.update(cx_a, |call, cx| call.hang_up(cx).unwrap()); + active_call_a + .update(cx_a, |call, cx| call.hang_up(cx)) + .await + .unwrap(); deterministic.run_until_parked(); assert!(incoming_call_b1.next().await.unwrap().is_none()); assert!(incoming_call_b2.next().await.unwrap().is_none()); @@ -1371,7 +1383,10 @@ async fn test_unshare_project( .unwrap(); // When client B leaves the room, the project becomes read-only. - active_call_b.update(cx_b, |call, cx| call.hang_up(cx).unwrap()); + active_call_b + .update(cx_b, |call, cx| call.hang_up(cx)) + .await + .unwrap(); deterministic.run_until_parked(); assert!(project_b.read_with(cx_b, |project, _| project.is_read_only())); @@ -1400,7 +1415,10 @@ async fn test_unshare_project( .unwrap(); // When client A (the host) leaves the room, the project gets unshared and guests are notified. - active_call_a.update(cx_a, |call, cx| call.hang_up(cx).unwrap()); + active_call_a + .update(cx_a, |call, cx| call.hang_up(cx)) + .await + .unwrap(); deterministic.run_until_parked(); project_a.read_with(cx_a, |project, _| assert!(!project.is_shared())); project_c2.read_with(cx_c, |project, _| { @@ -5455,7 +5473,10 @@ async fn test_contacts( [("user_b".to_string(), "online", "busy")] ); - active_call_a.update(cx_a, |call, cx| call.hang_up(cx).unwrap()); + active_call_a + .update(cx_a, |call, cx| call.hang_up(cx)) + .await + .unwrap(); deterministic.run_until_parked(); assert_eq!( contacts(&client_a, cx_a), diff --git a/crates/collab/src/tests/randomized_integration_tests.rs b/crates/collab/src/tests/randomized_integration_tests.rs index 950f12d1869168515eba1953d5857e50c74b1e51..960f9bfc1dbb6f6a331e923b0a650448002c02ea 100644 --- a/crates/collab/src/tests/randomized_integration_tests.rs +++ b/crates/collab/src/tests/randomized_integration_tests.rs @@ -641,7 +641,7 @@ async fn randomly_mutate_active_call( if can_hang_up && active_call.read_with(cx, |call, _| call.room().is_some()) => { log::info!("{}: hanging up", client.username); - active_call.update(cx, |call, cx| call.hang_up(cx))?; + active_call.update(cx, |call, cx| call.hang_up(cx)).await?; } _ => {} } diff --git a/crates/collab_ui/src/collab_titlebar_item.rs b/crates/collab_ui/src/collab_titlebar_item.rs index 6f214cfc1780e066c9b4a680c4bd5203e445d00d..040eb9aac3b7fe7b2c40ebd198ddc135e9475e4a 100644 --- a/crates/collab_ui/src/collab_titlebar_item.rs +++ b/crates/collab_ui/src/collab_titlebar_item.rs @@ -342,7 +342,7 @@ impl CollabTitlebarItem { fn leave_call(&mut self, _: &LeaveCall, cx: &mut ViewContext) { ActiveCall::global(cx) .update(cx, |call, cx| call.hang_up(cx)) - .log_err(); + .detach_and_log_err(cx); } fn render_toggle_contacts_button( diff --git a/crates/gpui/src/app.rs b/crates/gpui/src/app.rs index 7fc1de83fdc69257e8a0384b41db839dcfc21d13..ea95f7ed73e5167fca9ff3b5af025529acbcb33d 100644 --- a/crates/gpui/src/app.rs +++ b/crates/gpui/src/app.rs @@ -588,17 +588,20 @@ impl MutableAppContext { pub fn quit(&mut self) { let mut futures = Vec::new(); - for model_id in self.cx.models.keys().copied().collect::>() { - let mut model = self.cx.models.remove(&model_id).unwrap(); - futures.extend(model.app_will_quit(self)); - self.cx.models.insert(model_id, model); - } - for view_id in self.cx.views.keys().copied().collect::>() { - let mut view = self.cx.views.remove(&view_id).unwrap(); - futures.extend(view.app_will_quit(self)); - self.cx.views.insert(view_id, view); - } + self.update(|cx| { + for model_id in cx.models.keys().copied().collect::>() { + let mut model = cx.cx.models.remove(&model_id).unwrap(); + futures.extend(model.app_will_quit(cx)); + cx.cx.models.insert(model_id, model); + } + + for view_id in cx.views.keys().copied().collect::>() { + let mut view = cx.cx.views.remove(&view_id).unwrap(); + futures.extend(view.app_will_quit(cx)); + cx.cx.views.insert(view_id, view); + } + }); self.remove_all_windows(); diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 1a56abc7835103428352867b840102ef740c952f..823ffa7a19e120346345b536893adcb869d2a0e5 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -269,6 +269,7 @@ request_messages!( (JoinChannel, JoinChannelResponse), (JoinProject, JoinProjectResponse), (JoinRoom, JoinRoomResponse), + (LeaveRoom, Ack), (RejoinRoom, RejoinRoomResponse), (IncomingCall, Ack), (OpenBufferById, OpenBufferResponse), diff --git a/crates/rpc/src/rpc.rs b/crates/rpc/src/rpc.rs index 4f5f126516a4b4e11171847fc0107940b344e669..bec518b707be38d1083e9ffcf66982e7435b41d1 100644 --- a/crates/rpc/src/rpc.rs +++ b/crates/rpc/src/rpc.rs @@ -6,4 +6,4 @@ pub use conn::Connection; pub use peer::*; mod macros; -pub const PROTOCOL_VERSION: u32 = 49; +pub const PROTOCOL_VERSION: u32 = 50; diff --git a/crates/workspace/src/workspace.rs b/crates/workspace/src/workspace.rs index a7c80485f6f45cdd78c0b1eb737d776b963725a5..ad8ab3b3120c284347c5b03259e613eefaf77569 100644 --- a/crates/workspace/src/workspace.rs +++ b/crates/workspace/src/workspace.rs @@ -1068,7 +1068,10 @@ impl Workspace { if answer == Some(1) { return anyhow::Ok(false); } else { - active_call.update(&mut cx, |call, cx| call.hang_up(cx))?; + active_call + .update(&mut cx, |call, cx| call.hang_up(cx)) + .await + .log_err(); } } }