diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index e17a3667bae639186063685923a53a71867ffa27..97ba2fc97d92c1d245a772f35784bf32511bb2e4 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -155,17 +155,17 @@ impl Room { let connect = room.connect(&connection_info.server_url, &connection_info.token); cx.spawn(|this, mut cx| async move { connect.await?; - - let is_read_only = this - .update(&mut cx, |room, _| room.read_only()) - .unwrap_or(true); - - if !cx.update(|cx| Self::mute_on_join(cx))? && !is_read_only { - this.update(&mut cx, |this, cx| this.share_microphone(cx))? - .await?; - } - - anyhow::Ok(()) + this.update(&mut cx, |this, cx| { + if !this.read_only() { + if let Some(live_kit) = &this.live_kit { + if !live_kit.muted_by_user && !live_kit.deafened { + return this.share_microphone(cx); + } + } + } + Task::ready(Ok(())) + })? + .await }) .detach_and_log_err(cx); @@ -174,7 +174,7 @@ impl Room { screen_track: LocalTrack::None, microphone_track: LocalTrack::None, next_publish_id: 0, - muted_by_user: false, + muted_by_user: Self::mute_on_join(cx), deafened: false, speaking: false, _maintain_room, @@ -1047,6 +1047,15 @@ impl Room { } RoomUpdate::SubscribedToRemoteAudioTrack(track, publication) => { + if let Some(live_kit) = &self.live_kit { + if live_kit.deafened { + track.stop(); + cx.foreground_executor() + .spawn(publication.set_enabled(false)) + .detach(); + } + } + let user_id = track.publisher_id().parse()?; let track_id = track.sid().to_string(); let participant = self @@ -1301,15 +1310,12 @@ impl Room { }) } - pub fn is_muted(&self, cx: &AppContext) -> bool { - self.live_kit - .as_ref() - .and_then(|live_kit| match &live_kit.microphone_track { - LocalTrack::None => Some(Self::mute_on_join(cx)), - LocalTrack::Pending { muted, .. } => Some(*muted), - LocalTrack::Published { muted, .. } => Some(*muted), - }) - .unwrap_or(false) + pub fn is_muted(&self) -> bool { + self.live_kit.as_ref().map_or(false, |live_kit| { + matches!(live_kit.microphone_track, LocalTrack::None) + || live_kit.muted_by_user + || live_kit.deafened + }) } pub fn read_only(&self) -> bool { @@ -1331,16 +1337,11 @@ impl Room { pub fn share_microphone(&mut self, cx: &mut ModelContext) -> Task> { if self.status.is_offline() { return Task::ready(Err(anyhow!("room is offline"))); - } else if self.is_sharing_mic() { - return Task::ready(Err(anyhow!("microphone was already shared"))); } let publish_id = if let Some(live_kit) = self.live_kit.as_mut() { let publish_id = post_inc(&mut live_kit.next_publish_id); - live_kit.microphone_track = LocalTrack::Pending { - publish_id, - muted: false, - }; + live_kit.microphone_track = LocalTrack::Pending { publish_id }; cx.notify(); publish_id } else { @@ -1369,14 +1370,13 @@ impl Room { .as_mut() .ok_or_else(|| anyhow!("live-kit was not initialized"))?; - let (canceled, muted) = if let LocalTrack::Pending { + let canceled = if let LocalTrack::Pending { publish_id: cur_publish_id, - muted, } = &live_kit.microphone_track { - (*cur_publish_id != publish_id, *muted) + *cur_publish_id != publish_id } else { - (true, false) + true }; match publication { @@ -1384,14 +1384,13 @@ impl Room { if canceled { live_kit.room.unpublish_track(publication); } else { - if muted { + if live_kit.muted_by_user || live_kit.deafened { cx.background_executor() - .spawn(publication.set_mute(muted)) + .spawn(publication.set_mute(true)) .detach(); } live_kit.microphone_track = LocalTrack::Published { track_publication: publication, - muted, }; cx.notify(); } @@ -1420,10 +1419,7 @@ impl Room { let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() { let publish_id = post_inc(&mut live_kit.next_publish_id); - live_kit.screen_track = LocalTrack::Pending { - publish_id, - muted: false, - }; + live_kit.screen_track = LocalTrack::Pending { publish_id }; cx.notify(); (live_kit.room.display_sources(), publish_id) } else { @@ -1457,14 +1453,13 @@ impl Room { .as_mut() .ok_or_else(|| anyhow!("live-kit was not initialized"))?; - let (canceled, muted) = if let LocalTrack::Pending { + let canceled = if let LocalTrack::Pending { publish_id: cur_publish_id, - muted, } = &live_kit.screen_track { - (*cur_publish_id != publish_id, *muted) + *cur_publish_id != publish_id } else { - (true, false) + true }; match publication { @@ -1472,14 +1467,8 @@ impl Room { if canceled { live_kit.room.unpublish_track(publication); } else { - if muted { - cx.background_executor() - .spawn(publication.set_mute(muted)) - .detach(); - } live_kit.screen_track = LocalTrack::Published { track_publication: publication, - muted, }; cx.notify(); } @@ -1502,61 +1491,51 @@ impl Room { }) } - pub fn toggle_mute(&mut self, cx: &mut ModelContext) -> Result>> { - let should_mute = !self.is_muted(cx); + pub fn toggle_mute(&mut self, cx: &mut ModelContext) { if let Some(live_kit) = self.live_kit.as_mut() { - if matches!(live_kit.microphone_track, LocalTrack::None) { - return Ok(self.share_microphone(cx)); + // When unmuting, undeafen if the user was deafened before. + let was_deafened = live_kit.deafened; + if live_kit.muted_by_user + || live_kit.deafened + || matches!(live_kit.microphone_track, LocalTrack::None) + { + live_kit.muted_by_user = false; + live_kit.deafened = false; + } else { + live_kit.muted_by_user = true; } + let muted = live_kit.muted_by_user; + let should_undeafen = was_deafened && !live_kit.deafened; - let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?; - live_kit.muted_by_user = should_mute; + if let Some(task) = self.set_mute(muted, cx) { + task.detach_and_log_err(cx); + } - if old_muted == true && live_kit.deafened == true { - if let Some(task) = self.toggle_deafen(cx).ok() { - task.detach(); + if should_undeafen { + if let Some(task) = self.set_deafened(false, cx) { + task.detach_and_log_err(cx); } } - - Ok(ret_task) - } else { - Err(anyhow!("LiveKit not started")) } } - pub fn toggle_deafen(&mut self, cx: &mut ModelContext) -> Result>> { + pub fn toggle_deafen(&mut self, cx: &mut ModelContext) { if let Some(live_kit) = self.live_kit.as_mut() { - (*live_kit).deafened = !live_kit.deafened; - - let mut tasks = Vec::with_capacity(self.remote_participants.len()); - // Context notification is sent within set_mute itself. - let mut mute_task = None; - // When deafening, mute user's mic as well. - // When undeafening, unmute user's mic unless it was manually muted prior to deafening. - if live_kit.deafened || !live_kit.muted_by_user { - mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0); - }; - for participant in self.remote_participants.values() { - for track in live_kit - .room - .remote_audio_track_publications(&participant.user.id.to_string()) - { - let deafened = live_kit.deafened; - tasks.push(cx.foreground_executor().spawn(track.set_enabled(!deafened))); - } + // When deafening, mute the microphone if it was not already muted. + // When un-deafening, unmute the microphone, unless it was explicitly muted. + let deafened = !live_kit.deafened; + live_kit.deafened = deafened; + let should_change_mute = !live_kit.muted_by_user; + + if let Some(task) = self.set_deafened(deafened, cx) { + task.detach_and_log_err(cx); } - Ok(cx.foreground_executor().spawn(async move { - if let Some(mute_task) = mute_task { - mute_task.await?; + if should_change_mute { + if let Some(task) = self.set_mute(deafened, cx) { + task.detach_and_log_err(cx); } - for task in tasks { - task.await?; - } - Ok(()) - })) - } else { - Err(anyhow!("LiveKit not started")) + } } } @@ -1587,6 +1566,70 @@ impl Room { } } + fn set_deafened( + &mut self, + deafened: bool, + cx: &mut ModelContext, + ) -> Option>> { + let live_kit = self.live_kit.as_mut()?; + cx.notify(); + + let mut track_updates = Vec::new(); + for participant in self.remote_participants.values() { + for publication in live_kit + .room + .remote_audio_track_publications(&participant.user.id.to_string()) + { + track_updates.push(publication.set_enabled(!deafened)); + } + + for track in participant.audio_tracks.values() { + if deafened { + track.stop(); + } else { + track.start(); + } + } + } + + Some(cx.foreground_executor().spawn(async move { + for result in futures::future::join_all(track_updates).await { + result?; + } + Ok(()) + })) + } + + fn set_mute( + &mut self, + should_mute: bool, + cx: &mut ModelContext, + ) -> Option>> { + let live_kit = self.live_kit.as_mut()?; + cx.notify(); + + if should_mute { + Audio::play_sound(Sound::Mute, cx); + } else { + Audio::play_sound(Sound::Unmute, cx); + } + + match &mut live_kit.microphone_track { + LocalTrack::None => { + if should_mute { + None + } else { + Some(self.share_microphone(cx)) + } + } + LocalTrack::Pending { .. } => None, + LocalTrack::Published { track_publication } => Some( + cx.foreground_executor() + .spawn(track_publication.set_mute(should_mute)), + ), + } + } + #[cfg(any(test, feature = "test-support"))] pub fn set_display_sources(&self, sources: Vec) { self.live_kit @@ -1611,50 +1654,6 @@ struct LiveKitRoom { } impl LiveKitRoom { - fn set_mute( - self: &mut LiveKitRoom, - should_mute: bool, - cx: &mut ModelContext, - ) -> Result<(Task>, bool)> { - if !should_mute { - // clear user muting state. - self.muted_by_user = false; - } - - let (result, old_muted) = match &mut self.microphone_track { - LocalTrack::None => Err(anyhow!("microphone was not shared")), - LocalTrack::Pending { muted, .. } => { - let old_muted = *muted; - *muted = should_mute; - cx.notify(); - Ok((Task::Ready(Some(Ok(()))), old_muted)) - } - LocalTrack::Published { - track_publication, - muted, - } => { - let old_muted = *muted; - *muted = should_mute; - cx.notify(); - Ok(( - cx.background_executor() - .spawn(track_publication.set_mute(*muted)), - old_muted, - )) - } - }?; - - if old_muted != should_mute { - if should_mute { - Audio::play_sound(Sound::Mute, cx); - } else { - Audio::play_sound(Sound::Unmute, cx); - } - } - - Ok((result, old_muted)) - } - fn stop_publishing(&mut self, cx: &mut ModelContext) { if let LocalTrack::Published { track_publication, .. @@ -1678,11 +1677,9 @@ enum LocalTrack { None, Pending { publish_id: usize, - muted: bool, }, Published { track_publication: LocalTrackPublication, - muted: bool, }, } diff --git a/crates/collab/src/tests/channel_guest_tests.rs b/crates/collab/src/tests/channel_guest_tests.rs index d5933235926fa1da8c1e8538dedd2a7506504cd8..f3326cd6922b7482f6c9d953a5a57a89676b717d 100644 --- a/crates/collab/src/tests/channel_guest_tests.rs +++ b/crates/collab/src/tests/channel_guest_tests.rs @@ -57,7 +57,7 @@ async fn test_channel_guests( }) .await .is_err()); - assert!(room_b.read_with(cx_b, |room, _| !room.is_sharing_mic())); + assert!(room_b.read_with(cx_b, |room, _| room.is_muted())); } #[gpui::test] @@ -104,6 +104,7 @@ async fn test_channel_guest_promotion(cx_a: &mut TestAppContext, cx_b: &mut Test }); assert!(project_b.read_with(cx_b, |project, _| project.is_read_only())); assert!(editor_b.update(cx_b, |e, cx| e.read_only(cx))); + assert!(room_b.read_with(cx_b, |room, _| room.read_only())); assert!(room_b .update(cx_b, |room, cx| room.share_microphone(cx)) .await @@ -127,10 +128,13 @@ async fn test_channel_guest_promotion(cx_a: &mut TestAppContext, cx_b: &mut Test // project and buffers are now editable assert!(project_b.read_with(cx_b, |project, _| !project.is_read_only())); assert!(editor_b.update(cx_b, |editor, cx| !editor.read_only(cx))); - room_b - .update(cx_b, |room, cx| room.share_microphone(cx)) - .await - .unwrap(); + + // B sees themselves as muted, and can unmute. + assert!(room_b.read_with(cx_b, |room, _| !room.read_only())); + room_b.read_with(cx_b, |room, _| assert!(room.is_muted())); + room_b.update(cx_b, |room, cx| room.toggle_mute(cx)); + cx_a.run_until_parked(); + room_b.read_with(cx_b, |room, _| assert!(!room.is_muted())); // B is demoted active_call_a diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index cedc841527ff5c4c40d2630dc9c15166ccec46d8..e68fd10d8d16b29300c3fa658596468df06be880 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -1876,6 +1876,186 @@ fn active_call_events(cx: &mut TestAppContext) -> Rc>> events } +#[gpui::test] +async fn test_mute_deafen( + executor: BackgroundExecutor, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, + cx_c: &mut TestAppContext, +) { + let mut server = TestServer::start(executor.clone()).await; + let client_a = server.create_client(cx_a, "user_a").await; + let client_b = server.create_client(cx_b, "user_b").await; + let client_c = server.create_client(cx_c, "user_c").await; + + server + .make_contacts(&mut [(&client_a, cx_a), (&client_b, cx_b), (&client_c, cx_c)]) + .await; + + let active_call_a = cx_a.read(ActiveCall::global); + let active_call_b = cx_b.read(ActiveCall::global); + let active_call_c = cx_c.read(ActiveCall::global); + + // User A calls user B, B answers. + active_call_a + .update(cx_a, |call, cx| { + call.invite(client_b.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + executor.run_until_parked(); + active_call_b + .update(cx_b, |call, cx| call.accept_incoming(cx)) + .await + .unwrap(); + executor.run_until_parked(); + + let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone()); + let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone()); + + room_a.read_with(cx_a, |room, _| assert!(!room.is_muted())); + room_b.read_with(cx_b, |room, _| assert!(!room.is_muted())); + + // Users A and B are both muted. + assert_eq!( + participant_audio_state(&room_a, cx_a), + &[ParticipantAudioState { + user_id: client_b.user_id().unwrap(), + is_muted: false, + audio_tracks_playing: vec![true], + }] + ); + assert_eq!( + participant_audio_state(&room_b, cx_b), + &[ParticipantAudioState { + user_id: client_a.user_id().unwrap(), + is_muted: false, + audio_tracks_playing: vec![true], + }] + ); + + // User A mutes + room_a.update(cx_a, |room, cx| room.toggle_mute(cx)); + executor.run_until_parked(); + + // User A hears user B, but B doesn't hear A. + room_a.read_with(cx_a, |room, _| assert!(room.is_muted())); + room_b.read_with(cx_b, |room, _| assert!(!room.is_muted())); + assert_eq!( + participant_audio_state(&room_a, cx_a), + &[ParticipantAudioState { + user_id: client_b.user_id().unwrap(), + is_muted: false, + audio_tracks_playing: vec![true], + }] + ); + assert_eq!( + participant_audio_state(&room_b, cx_b), + &[ParticipantAudioState { + user_id: client_a.user_id().unwrap(), + is_muted: true, + audio_tracks_playing: vec![true], + }] + ); + + // User A deafens + room_a.update(cx_a, |room, cx| room.toggle_deafen(cx)); + executor.run_until_parked(); + + // User A does not hear user B. + room_a.read_with(cx_a, |room, _| assert!(room.is_muted())); + room_b.read_with(cx_b, |room, _| assert!(!room.is_muted())); + assert_eq!( + participant_audio_state(&room_a, cx_a), + &[ParticipantAudioState { + user_id: client_b.user_id().unwrap(), + is_muted: false, + audio_tracks_playing: vec![false], + }] + ); + assert_eq!( + participant_audio_state(&room_b, cx_b), + &[ParticipantAudioState { + user_id: client_a.user_id().unwrap(), + is_muted: true, + audio_tracks_playing: vec![true], + }] + ); + + // User B calls user C, C joins. + active_call_b + .update(cx_b, |call, cx| { + call.invite(client_c.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + executor.run_until_parked(); + active_call_c + .update(cx_c, |call, cx| call.accept_incoming(cx)) + .await + .unwrap(); + executor.run_until_parked(); + + // User A does not hear users B or C. + assert_eq!( + participant_audio_state(&room_a, cx_a), + &[ + ParticipantAudioState { + user_id: client_b.user_id().unwrap(), + is_muted: false, + audio_tracks_playing: vec![false], + }, + ParticipantAudioState { + user_id: client_c.user_id().unwrap(), + is_muted: false, + audio_tracks_playing: vec![false], + } + ] + ); + assert_eq!( + participant_audio_state(&room_b, cx_b), + &[ + ParticipantAudioState { + user_id: client_a.user_id().unwrap(), + is_muted: true, + audio_tracks_playing: vec![true], + }, + ParticipantAudioState { + user_id: client_c.user_id().unwrap(), + is_muted: false, + audio_tracks_playing: vec![true], + } + ] + ); + + #[derive(PartialEq, Eq, Debug)] + struct ParticipantAudioState { + user_id: u64, + is_muted: bool, + audio_tracks_playing: Vec, + } + + fn participant_audio_state( + room: &Model, + cx: &TestAppContext, + ) -> Vec { + room.read_with(cx, |room, _| { + room.remote_participants() + .iter() + .map(|(user_id, participant)| ParticipantAudioState { + user_id: *user_id, + is_muted: participant.muted, + audio_tracks_playing: participant + .audio_tracks + .values() + .map(|track| track.is_playing()) + .collect(), + }) + .collect::>() + }) + } +} + #[gpui::test(iterations = 10)] async fn test_room_location( executor: BackgroundExecutor, diff --git a/crates/collab/src/tests/test_server.rs b/crates/collab/src/tests/test_server.rs index 4fcf6aa67652fabe9187ba5f78b8899379dc471b..cda0621cb32385a399fdfdaef51821dd531281b2 100644 --- a/crates/collab/src/tests/test_server.rs +++ b/crates/collab/src/tests/test_server.rs @@ -248,7 +248,6 @@ impl TestServer { language::init(cx); editor::init(cx); workspace::init(app_state.clone(), cx); - audio::init((), cx); call::init(client.clone(), user_store.clone(), cx); channel::init(&client, user_store.clone(), cx); notifications::init(client.clone(), user_store, cx); diff --git a/crates/collab_ui/src/collab_titlebar_item.rs b/crates/collab_ui/src/collab_titlebar_item.rs index e4ab39698f92118102bad10b154f8b09b0c14bc6..432f8f6cd242b3b7bbeb53464de98980b4079e47 100644 --- a/crates/collab_ui/src/collab_titlebar_item.rs +++ b/crates/collab_ui/src/collab_titlebar_item.rs @@ -102,7 +102,7 @@ impl Render for CollabTitlebarItem { peer_id, true, room.is_speaking(), - room.is_muted(cx), + room.is_muted(), &room, project_id, ¤t_user, @@ -168,7 +168,7 @@ impl Render for CollabTitlebarItem { let project = self.project.read(cx); let is_local = project.is_local(); let is_shared = is_local && project.is_shared(); - let is_muted = room.is_muted(cx); + let is_muted = room.is_muted(); let is_deafened = room.is_deafened().unwrap_or(false); let is_screen_sharing = room.is_screen_sharing(); let read_only = room.read_only(); diff --git a/crates/collab_ui/src/collab_ui.rs b/crates/collab_ui/src/collab_ui.rs index 779fd121f8afbfa59eac556c17e7abba605e8eee..f4248506781a4dca1dbe23c59fd67182a59f392f 100644 --- a/crates/collab_ui/src/collab_ui.rs +++ b/crates/collab_ui/src/collab_ui.rs @@ -9,7 +9,7 @@ mod panel_settings; use std::{rc::Rc, sync::Arc}; -use call::{report_call_event_for_room, ActiveCall, Room}; +use call::{report_call_event_for_room, ActiveCall}; pub use collab_panel::CollabPanel; pub use collab_titlebar_item::CollabTitlebarItem; use feature_flags::{ChannelsAlpha, FeatureFlagAppExt}; @@ -21,7 +21,6 @@ pub use panel_settings::{ ChatPanelSettings, CollaborationPanelSettings, NotificationPanelSettings, }; use settings::Settings; -use util::ResultExt; use workspace::AppState; actions!( @@ -75,7 +74,7 @@ pub fn toggle_mute(_: &ToggleMute, cx: &mut AppContext) { if let Some(room) = call.room().cloned() { let client = call.client(); room.update(cx, |room, cx| { - let operation = if room.is_muted(cx) { + let operation = if room.is_muted() { "enable microphone" } else { "disable microphone" @@ -83,17 +82,13 @@ pub fn toggle_mute(_: &ToggleMute, cx: &mut AppContext) { report_call_event_for_room(operation, room.id(), room.channel_id(), &client); room.toggle_mute(cx) - }) - .map(|task| task.detach_and_log_err(cx)) - .log_err(); + }); } } pub fn toggle_deafen(_: &ToggleDeafen, cx: &mut AppContext) { if let Some(room) = ActiveCall::global(cx).read(cx).room().cloned() { - room.update(cx, Room::toggle_deafen) - .map(|task| task.detach_and_log_err(cx)) - .log_err(); + room.update(cx, |room, cx| room.toggle_deafen(cx)); } } diff --git a/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift b/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift index db5da8e0e9ec5608e81fe34f3aa901d2e819f21d..7468c08791f8b0782cf711cb9b546bf2940ae6fe 100644 --- a/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift +++ b/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift @@ -286,6 +286,18 @@ public func LKRemoteAudioTrackGetSid(track: UnsafeRawPointer) -> CFString { return track.sid! as CFString } +@_cdecl("LKRemoteAudioTrackStart") +public func LKRemoteAudioTrackStart(track: UnsafeRawPointer) { + let track = Unmanaged.fromOpaque(track).takeUnretainedValue() + track.start() +} + +@_cdecl("LKRemoteAudioTrackStop") +public func LKRemoteAudioTrackStop(track: UnsafeRawPointer) { + let track = Unmanaged.fromOpaque(track).takeUnretainedValue() + track.stop() +} + @_cdecl("LKDisplaySources") public func LKDisplaySources(data: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, CFArray?, CFString?) -> Void) { MacOSScreenCapturer.sources(for: .display, includeCurrentApplication: false, preferredMethod: .legacy).then { displaySources in diff --git a/crates/live_kit_client/src/prod.rs b/crates/live_kit_client/src/prod.rs index f1660cc3d1a7ce9f7951cfd5f1a354158a41d334..a4bd9d4f07b6c88f3b7fe7dca343a972659d975f 100644 --- a/crates/live_kit_client/src/prod.rs +++ b/crates/live_kit_client/src/prod.rs @@ -18,8 +18,6 @@ use std::{ sync::{Arc, Weak}, }; -// SAFETY: Most live kit types are threadsafe: -// https://github.com/livekit/client-sdk-swift#thread-safety macro_rules! pointer_type { ($pointer_name:ident) => { #[repr(transparent)] @@ -134,8 +132,10 @@ extern "C" { ) -> *const c_void; fn LKRemoteAudioTrackGetSid(track: swift::RemoteAudioTrack) -> CFStringRef; - fn LKVideoTrackAddRenderer(track: swift::RemoteVideoTrack, renderer: *const c_void); fn LKRemoteVideoTrackGetSid(track: swift::RemoteVideoTrack) -> CFStringRef; + fn LKRemoteAudioTrackStart(track: swift::RemoteAudioTrack); + fn LKRemoteAudioTrackStop(track: swift::RemoteAudioTrack); + fn LKVideoTrackAddRenderer(track: swift::RemoteVideoTrack, renderer: *const c_void); fn LKDisplaySources( callback_data: *mut c_void, @@ -853,12 +853,12 @@ impl RemoteAudioTrack { &self.publisher_id } - pub fn enable(&self) -> impl Future> { - async { Ok(()) } + pub fn start(&self) { + unsafe { LKRemoteAudioTrackStart(self.native_track) } } - pub fn disable(&self) -> impl Future> { - async { Ok(()) } + pub fn stop(&self) { + unsafe { LKRemoteAudioTrackStop(self.native_track) } } } diff --git a/crates/live_kit_client/src/test.rs b/crates/live_kit_client/src/test.rs index 0716042ff196e1e0ea8f3543f03b645648ed473c..96ca2b90dcd5de645b927358e2ff2e2779592003 100644 --- a/crates/live_kit_client/src/test.rs +++ b/crates/live_kit_client/src/test.rs @@ -1,7 +1,7 @@ use crate::{ConnectionState, RoomUpdate, Sid}; use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; -use collections::{BTreeMap, HashMap}; +use collections::{BTreeMap, HashMap, HashSet}; use futures::Stream; use gpui::BackgroundExecutor; use live_kit_server::{proto, token}; @@ -13,7 +13,7 @@ use std::{ mem, sync::{ atomic::{AtomicBool, Ordering::SeqCst}, - Arc, + Arc, Weak, }, }; @@ -113,7 +113,25 @@ impl TestServer { .0 .lock() .updates_tx - .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone())) + .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new( + RemoteVideoTrack { + server_track: track.clone(), + }, + ))) + .unwrap(); + } + for track in &room.audio_tracks { + client_room + .0 + .lock() + .updates_tx + .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack( + Arc::new(RemoteAudioTrack { + server_track: track.clone(), + room: Arc::downgrade(&client_room), + }), + Arc::new(RemoteTrackPublication), + )) .unwrap(); } room.client_rooms.insert(identity, client_room); @@ -210,7 +228,7 @@ impl TestServer { } let sid = nanoid::nanoid!(17); - let track = Arc::new(RemoteVideoTrack { + let track = Arc::new(TestServerVideoTrack { sid: sid.clone(), publisher_id: identity.clone(), frames_rx: local_track.frames_rx.clone(), @@ -224,7 +242,11 @@ impl TestServer { .0 .lock() .updates_tx - .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone())) + .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new( + RemoteVideoTrack { + server_track: track.clone(), + }, + ))) .unwrap(); } } @@ -259,9 +281,10 @@ impl TestServer { } let sid = nanoid::nanoid!(17); - let track = Arc::new(RemoteAudioTrack { + let track = Arc::new(TestServerAudioTrack { sid: sid.clone(), publisher_id: identity.clone(), + muted: AtomicBool::new(false), }); let publication = Arc::new(RemoteTrackPublication); @@ -275,7 +298,10 @@ impl TestServer { .lock() .updates_tx .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack( - track.clone(), + Arc::new(RemoteAudioTrack { + server_track: track.clone(), + room: Arc::downgrade(&client_room), + }), publication.clone(), )) .unwrap(); @@ -285,37 +311,123 @@ impl TestServer { Ok(sid) } + fn set_track_muted(&self, token: &str, track_sid: &str, muted: bool) -> Result<()> { + let claims = live_kit_server::token::validate(&token, &self.secret_key)?; + let room_name = claims.video.room.unwrap(); + let identity = claims.sub.unwrap(); + let mut server_rooms = self.rooms.lock(); + let room = server_rooms + .get_mut(&*room_name) + .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; + if let Some(track) = room + .audio_tracks + .iter_mut() + .find(|track| track.sid == track_sid) + { + track.muted.store(muted, SeqCst); + for (id, client_room) in room.client_rooms.iter() { + if *id != identity { + client_room + .0 + .lock() + .updates_tx + .try_broadcast(RoomUpdate::RemoteAudioTrackMuteChanged { + track_id: track_sid.to_string(), + muted, + }) + .unwrap(); + } + } + } + Ok(()) + } + + fn is_track_muted(&self, token: &str, track_sid: &str) -> Option { + let claims = live_kit_server::token::validate(&token, &self.secret_key).ok()?; + let room_name = claims.video.room.unwrap(); + + let mut server_rooms = self.rooms.lock(); + let room = server_rooms.get_mut(&*room_name)?; + room.audio_tracks.iter().find_map(|track| { + if track.sid == track_sid { + Some(track.muted.load(SeqCst)) + } else { + None + } + }) + } + fn video_tracks(&self, token: String) -> Result>> { let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let room_name = claims.video.room.unwrap(); + let identity = claims.sub.unwrap(); let mut server_rooms = self.rooms.lock(); let room = server_rooms .get_mut(&*room_name) .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; - Ok(room.video_tracks.clone()) + room.client_rooms + .get(identity.as_ref()) + .ok_or_else(|| anyhow!("not a participant in room"))?; + Ok(room + .video_tracks + .iter() + .map(|track| { + Arc::new(RemoteVideoTrack { + server_track: track.clone(), + }) + }) + .collect()) } fn audio_tracks(&self, token: String) -> Result>> { let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let room_name = claims.video.room.unwrap(); + let identity = claims.sub.unwrap(); let mut server_rooms = self.rooms.lock(); let room = server_rooms .get_mut(&*room_name) .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; - Ok(room.audio_tracks.clone()) + let client_room = room + .client_rooms + .get(identity.as_ref()) + .ok_or_else(|| anyhow!("not a participant in room"))?; + Ok(room + .audio_tracks + .iter() + .map(|track| { + Arc::new(RemoteAudioTrack { + server_track: track.clone(), + room: Arc::downgrade(&client_room), + }) + }) + .collect()) } } #[derive(Default)] struct TestServerRoom { client_rooms: HashMap>, - video_tracks: Vec>, - audio_tracks: Vec>, + video_tracks: Vec>, + audio_tracks: Vec>, participant_permissions: HashMap, } +#[derive(Debug)] +struct TestServerVideoTrack { + sid: Sid, + publisher_id: Sid, + frames_rx: async_broadcast::Receiver, +} + +#[derive(Debug)] +struct TestServerAudioTrack { + sid: Sid, + publisher_id: Sid, + muted: AtomicBool, +} + impl TestServerRoom {} pub struct TestApiClient { @@ -386,6 +498,7 @@ struct RoomState { watch::Receiver, ), display_sources: Vec, + paused_audio_tracks: HashSet, updates_tx: async_broadcast::Sender, updates_rx: async_broadcast::Receiver, } @@ -398,6 +511,7 @@ impl Room { Arc::new(Self(Mutex::new(RoomState { connection: watch::channel_with(ConnectionState::Disconnected), display_sources: Default::default(), + paused_audio_tracks: Default::default(), updates_tx, updates_rx, }))) @@ -443,11 +557,12 @@ impl Room { .publish_video_track(this.token(), track) .await?; Ok(LocalTrackPublication { - muted: Default::default(), + room: Arc::downgrade(&this), sid, }) } } + pub fn publish_audio_track( self: &Arc, track: LocalAudioTrack, @@ -460,7 +575,7 @@ impl Room { .publish_audio_track(this.token(), &track) .await?; Ok(LocalTrackPublication { - muted: Default::default(), + room: Arc::downgrade(&this), sid, }) } @@ -560,20 +675,31 @@ impl Drop for Room { #[derive(Clone)] pub struct LocalTrackPublication { sid: String, - muted: Arc, + room: Weak, } impl LocalTrackPublication { pub fn set_mute(&self, mute: bool) -> impl Future> { - let muted = self.muted.clone(); + let sid = self.sid.clone(); + let room = self.room.clone(); async move { - muted.store(mute, SeqCst); - Ok(()) + if let Some(room) = room.upgrade() { + room.test_server() + .set_track_muted(&room.token(), &sid, mute) + } else { + Err(anyhow!("no such room")) + } } } pub fn is_muted(&self) -> bool { - self.muted.load(SeqCst) + if let Some(room) = self.room.upgrade() { + room.test_server() + .is_track_muted(&room.token(), &self.sid) + .unwrap_or(false) + } else { + false + } } pub fn sid(&self) -> String { @@ -621,46 +747,65 @@ impl LocalAudioTrack { #[derive(Debug)] pub struct RemoteVideoTrack { - sid: Sid, - publisher_id: Sid, - frames_rx: async_broadcast::Receiver, + server_track: Arc, } impl RemoteVideoTrack { pub fn sid(&self) -> &str { - &self.sid + &self.server_track.sid } pub fn publisher_id(&self) -> &str { - &self.publisher_id + &self.server_track.publisher_id } pub fn frames(&self) -> async_broadcast::Receiver { - self.frames_rx.clone() + self.server_track.frames_rx.clone() } } #[derive(Debug)] pub struct RemoteAudioTrack { - sid: Sid, - publisher_id: Sid, + server_track: Arc, + room: Weak, } impl RemoteAudioTrack { pub fn sid(&self) -> &str { - &self.sid + &self.server_track.sid } pub fn publisher_id(&self) -> &str { - &self.publisher_id + &self.server_track.publisher_id } - pub fn enable(&self) -> impl Future> { - async { Ok(()) } + pub fn start(&self) { + if let Some(room) = self.room.upgrade() { + room.0 + .lock() + .paused_audio_tracks + .remove(&self.server_track.sid); + } } - pub fn disable(&self) -> impl Future> { - async { Ok(()) } + pub fn stop(&self) { + if let Some(room) = self.room.upgrade() { + room.0 + .lock() + .paused_audio_tracks + .insert(self.server_track.sid.clone()); + } + } + + pub fn is_playing(&self) -> bool { + !self + .room + .upgrade() + .unwrap() + .0 + .lock() + .paused_audio_tracks + .contains(&self.server_track.sid) } }