diff --git a/crates/call/src/call.rs b/crates/call/src/call.rs index 08463413257b4b10972ad08ab7c64d181599d914..ca1a60bd631c885924da7e6a8ca8b3c5ce2aa114 100644 --- a/crates/call/src/call.rs +++ b/crates/call/src/call.rs @@ -10,7 +10,7 @@ use client::{ ZED_ALWAYS_ACTIVE, }; use collections::HashSet; -use futures::{future::Shared, FutureExt}; +use futures::{channel::oneshot, future::Shared, Future, FutureExt}; use gpui::{ AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task, WeakModelHandle, @@ -37,10 +37,42 @@ pub struct IncomingCall { pub initial_project: Option, } +pub struct OneAtATime { + cancel: Option>, +} + +impl OneAtATime { + /// spawn a task in the given context. + /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None) + /// otherwise you'll see the result of the task. + fn spawn(&mut self, cx: &mut AppContext, f: F) -> Task>> + where + F: 'static + FnOnce(AsyncAppContext) -> Fut, + Fut: Future>, + R: 'static, + { + let (tx, rx) = oneshot::channel(); + self.cancel.replace(tx); + cx.spawn(|cx| async move { + futures::select_biased! { + _ = rx.fuse() => Ok(None), + result = f(cx).fuse() => result.map(Some), + } + }) + } + + fn running(&self) -> bool { + self.cancel + .as_ref() + .is_some_and(|cancel| !cancel.is_canceled()) + } +} + /// Singleton global maintaining the user's participation in a room across workspaces. pub struct ActiveCall { room: Option<(ModelHandle, Vec)>, pending_room_creation: Option, Arc>>>>, + _join_debouncer: OneAtATime, location: Option>, pending_invites: HashSet, incoming_call: ( @@ -69,6 +101,7 @@ impl ActiveCall { pending_invites: Default::default(), incoming_call: watch::channel(), + _join_debouncer: OneAtATime { cancel: None }, _subscriptions: vec![ client.add_request_handler(cx.handle(), Self::handle_incoming_call), client.add_message_handler(cx.handle(), Self::handle_call_canceled), @@ -143,6 +176,10 @@ impl ActiveCall { } cx.notify(); + if self._join_debouncer.running() { + return Task::ready(Ok(())); + } + let room = if let Some(room) = self.room().cloned() { Some(Task::ready(Ok(room)).shared()) } else { @@ -259,11 +296,20 @@ impl ActiveCall { return Task::ready(Err(anyhow!("no incoming call"))); }; - let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx); + if self.pending_room_creation.is_some() { + return Task::ready(Ok(())); + } + + let room_id = call.room_id.clone(); + let client = self.client.clone(); + let user_store = self.user_store.clone(); + let join = self + ._join_debouncer + .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx)); cx.spawn(|this, mut cx| async move { let room = join.await?; - this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx)) + this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx)) .await?; this.update(&mut cx, |this, cx| { this.report_call_event("accept incoming", cx) @@ -290,20 +336,28 @@ impl ActiveCall { &mut self, channel_id: u64, cx: &mut ModelContext, - ) -> Task>> { + ) -> Task>>> { if let Some(room) = self.room().cloned() { if room.read(cx).channel_id() == Some(channel_id) { - return Task::ready(Ok(room)); + return Task::ready(Ok(Some(room))); } else { room.update(cx, |room, cx| room.clear_state(cx)); } } - let join = Room::join_channel(channel_id, self.client.clone(), self.user_store.clone(), cx); + if self.pending_room_creation.is_some() { + return Task::ready(Ok(None)); + } - cx.spawn(|this, mut cx| async move { + let client = self.client.clone(); + let user_store = self.user_store.clone(); + let join = self._join_debouncer.spawn(cx, move |cx| async move { + Room::join_channel(channel_id, client, user_store, cx).await + }); + + cx.spawn(move |this, mut cx| async move { let room = join.await?; - this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx)) + this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx)) .await?; this.update(&mut cx, |this, cx| { this.report_call_event("join channel", cx) @@ -457,3 +511,40 @@ pub fn report_call_event_for_channel( }; telemetry.report_clickhouse_event(event, telemetry_settings); } + +#[cfg(test)] +mod test { + use gpui::TestAppContext; + + use crate::OneAtATime; + + #[gpui::test] + async fn test_one_at_a_time(cx: &mut TestAppContext) { + let mut one_at_a_time = OneAtATime { cancel: None }; + + assert_eq!( + cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) })) + .await + .unwrap(), + Some(1) + ); + + let (a, b) = cx.update(|cx| { + ( + one_at_a_time.spawn(cx, |_| async { + assert!(false); + Ok(2) + }), + one_at_a_time.spawn(cx, |_| async { Ok(3) }), + ) + }); + + assert_eq!(a.await.unwrap(), None); + assert_eq!(b.await.unwrap(), Some(3)); + + let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) })); + drop(one_at_a_time); + + assert_eq!(promise.await.unwrap(), None); + } +} diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 594d1b7892a72723c652382e1979a8334703d9c9..3ff5a3490161ce12958eaa688a921e5094bcd76f 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -1,7 +1,6 @@ use crate::{ call_settings::CallSettings, participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack}, - IncomingCall, }; use anyhow::{anyhow, Result}; use audio::{Audio, Sound}; @@ -291,37 +290,32 @@ impl Room { }) } - pub(crate) fn join_channel( + pub(crate) async fn join_channel( channel_id: u64, client: Arc, user_store: ModelHandle, - cx: &mut AppContext, - ) -> Task>> { - cx.spawn(|cx| async move { - Self::from_join_response( - client.request(proto::JoinChannel { channel_id }).await?, - client, - user_store, - cx, - ) - }) + cx: AsyncAppContext, + ) -> Result> { + Self::from_join_response( + client.request(proto::JoinChannel { channel_id }).await?, + client, + user_store, + cx, + ) } - pub(crate) fn join( - call: &IncomingCall, + pub(crate) async fn join( + room_id: u64, client: Arc, user_store: ModelHandle, - cx: &mut AppContext, - ) -> Task>> { - let id = call.room_id; - cx.spawn(|cx| async move { - Self::from_join_response( - client.request(proto::JoinRoom { id }).await?, - client, - user_store, - cx, - ) - }) + cx: AsyncAppContext, + ) -> Result> { + Self::from_join_response( + client.request(proto::JoinRoom { id: room_id }).await?, + client, + user_store, + cx, + ) } pub fn mute_on_join(cx: &AppContext) -> bool { diff --git a/crates/collab/src/tests.rs b/crates/collab/src/tests.rs index 139910e1f6f25281139865f0b80b8e20e15648d0..e8da66a75ac02a17fbd70b7cc139f757c09901c6 100644 --- a/crates/collab/src/tests.rs +++ b/crates/collab/src/tests.rs @@ -40,3 +40,7 @@ fn room_participants(room: &ModelHandle, cx: &mut TestAppContext) -> RoomP RoomParticipants { remote, pending } }) } + +fn channel_id(room: &ModelHandle, cx: &mut TestAppContext) -> Option { + cx.read(|cx| room.read(cx).channel_id()) +} diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index ea8c1bd4b378e80de133b5290ed3d6f293984ab8..550c3a2bd8ade7b0bb8e8df3481ae54549079af8 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -1,6 +1,6 @@ use crate::{ rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT}, - tests::{room_participants, RoomParticipants, TestClient, TestServer}, + tests::{channel_id, room_participants, RoomParticipants, TestClient, TestServer}, }; use call::{room, ActiveCall, ParticipantLocation, Room}; use client::{User, RECEIVE_TIMEOUT}; @@ -469,6 +469,119 @@ async fn test_calling_multiple_users_simultaneously( ); } +#[gpui::test(iterations = 10)] +async fn test_joining_channels_and_calling_multiple_users_simultaneously( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, + cx_c: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; + + let client_a = server.create_client(cx_a, "user_a").await; + let client_b = server.create_client(cx_b, "user_b").await; + let client_c = server.create_client(cx_c, "user_c").await; + server + .make_contacts(&mut [(&client_a, cx_a), (&client_b, cx_b), (&client_c, cx_c)]) + .await; + + let channel_1 = server + .make_channel( + "channel1", + None, + (&client_a, cx_a), + &mut [(&client_b, cx_b), (&client_c, cx_c)], + ) + .await; + + let channel_2 = server + .make_channel( + "channel2", + None, + (&client_a, cx_a), + &mut [(&client_b, cx_b), (&client_c, cx_c)], + ) + .await; + + let active_call_a = cx_a.read(ActiveCall::global); + + // Simultaneously join channel 1 and then channel 2 + active_call_a + .update(cx_a, |call, cx| call.join_channel(channel_1, cx)) + .detach(); + let join_channel_2 = active_call_a.update(cx_a, |call, cx| call.join_channel(channel_2, cx)); + + join_channel_2.await.unwrap(); + + let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone()); + deterministic.run_until_parked(); + + assert_eq!(channel_id(&room_a, cx_a), Some(channel_2)); + + // Leave the room + active_call_a + .update(cx_a, |call, cx| { + let hang_up = call.hang_up(cx); + hang_up + }) + .await + .unwrap(); + + // Initiating invites and then joining a channel should fail gracefully + let b_invite = active_call_a.update(cx_a, |call, cx| { + call.invite(client_b.user_id().unwrap(), None, cx) + }); + let c_invite = active_call_a.update(cx_a, |call, cx| { + call.invite(client_c.user_id().unwrap(), None, cx) + }); + + let join_channel = active_call_a.update(cx_a, |call, cx| call.join_channel(channel_1, cx)); + + b_invite.await.unwrap(); + c_invite.await.unwrap(); + join_channel.await.unwrap(); + + let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone()); + deterministic.run_until_parked(); + + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: Default::default(), + pending: vec!["user_b".to_string(), "user_c".to_string()] + } + ); + + assert_eq!(channel_id(&room_a, cx_a), None); + + // Leave the room + active_call_a + .update(cx_a, |call, cx| { + let hang_up = call.hang_up(cx); + hang_up + }) + .await + .unwrap(); + + // Simultaneously join channel 1 and call user B and user C from client A. + let join_channel = active_call_a.update(cx_a, |call, cx| call.join_channel(channel_1, cx)); + + let b_invite = active_call_a.update(cx_a, |call, cx| { + call.invite(client_b.user_id().unwrap(), None, cx) + }); + let c_invite = active_call_a.update(cx_a, |call, cx| { + call.invite(client_c.user_id().unwrap(), None, cx) + }); + + join_channel.await.unwrap(); + b_invite.await.unwrap(); + c_invite.await.unwrap(); + + active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone()); + deterministic.run_until_parked(); +} + #[gpui::test(iterations = 10)] async fn test_room_uniqueness( deterministic: Arc, diff --git a/crates/workspace/src/workspace.rs b/crates/workspace/src/workspace.rs index e879b981ef65f55ab221ddd888b84d025dbd1ef7..f0b2c7b0765d61b9a5173f1e97fa15e04faf20e5 100644 --- a/crates/workspace/src/workspace.rs +++ b/crates/workspace/src/workspace.rs @@ -4238,6 +4238,10 @@ async fn join_channel_internal( }) .await?; + let Some(room) = room else { + return anyhow::Ok(true); + }; + room.update(cx, |room, _| room.room_update_completed()) .await;