Detailed changes
@@ -155,17 +155,17 @@ impl Room {
let connect = room.connect(&connection_info.server_url, &connection_info.token);
cx.spawn(|this, mut cx| async move {
connect.await?;
-
- let is_read_only = this
- .update(&mut cx, |room, _| room.read_only())
- .unwrap_or(true);
-
- if !cx.update(|cx| Self::mute_on_join(cx))? && !is_read_only {
- this.update(&mut cx, |this, cx| this.share_microphone(cx))?
- .await?;
- }
-
- anyhow::Ok(())
+ this.update(&mut cx, |this, cx| {
+ if !this.read_only() {
+ if let Some(live_kit) = &this.live_kit {
+ if !live_kit.muted_by_user && !live_kit.deafened {
+ return this.share_microphone(cx);
+ }
+ }
+ }
+ Task::ready(Ok(()))
+ })?
+ .await
})
.detach_and_log_err(cx);
@@ -174,7 +174,7 @@ impl Room {
screen_track: LocalTrack::None,
microphone_track: LocalTrack::None,
next_publish_id: 0,
- muted_by_user: false,
+ muted_by_user: Self::mute_on_join(cx),
deafened: false,
speaking: false,
_maintain_room,
@@ -1047,6 +1047,15 @@ impl Room {
}
RoomUpdate::SubscribedToRemoteAudioTrack(track, publication) => {
+ if let Some(live_kit) = &self.live_kit {
+ if live_kit.deafened {
+ track.stop();
+ cx.foreground_executor()
+ .spawn(publication.set_enabled(false))
+ .detach();
+ }
+ }
+
let user_id = track.publisher_id().parse()?;
let track_id = track.sid().to_string();
let participant = self
@@ -1301,15 +1310,12 @@ impl Room {
})
}
- pub fn is_muted(&self, cx: &AppContext) -> bool {
- self.live_kit
- .as_ref()
- .and_then(|live_kit| match &live_kit.microphone_track {
- LocalTrack::None => Some(Self::mute_on_join(cx)),
- LocalTrack::Pending { muted, .. } => Some(*muted),
- LocalTrack::Published { muted, .. } => Some(*muted),
- })
- .unwrap_or(false)
+ pub fn is_muted(&self) -> bool {
+ self.live_kit.as_ref().map_or(false, |live_kit| {
+ matches!(live_kit.microphone_track, LocalTrack::None)
+ || live_kit.muted_by_user
+ || live_kit.deafened
+ })
}
pub fn read_only(&self) -> bool {
@@ -1331,16 +1337,11 @@ impl Room {
pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
if self.status.is_offline() {
return Task::ready(Err(anyhow!("room is offline")));
- } else if self.is_sharing_mic() {
- return Task::ready(Err(anyhow!("microphone was already shared")));
}
let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
let publish_id = post_inc(&mut live_kit.next_publish_id);
- live_kit.microphone_track = LocalTrack::Pending {
- publish_id,
- muted: false,
- };
+ live_kit.microphone_track = LocalTrack::Pending { publish_id };
cx.notify();
publish_id
} else {
@@ -1369,14 +1370,13 @@ impl Room {
.as_mut()
.ok_or_else(|| anyhow!("live-kit was not initialized"))?;
- let (canceled, muted) = if let LocalTrack::Pending {
+ let canceled = if let LocalTrack::Pending {
publish_id: cur_publish_id,
- muted,
} = &live_kit.microphone_track
{
- (*cur_publish_id != publish_id, *muted)
+ *cur_publish_id != publish_id
} else {
- (true, false)
+ true
};
match publication {
@@ -1384,14 +1384,13 @@ impl Room {
if canceled {
live_kit.room.unpublish_track(publication);
} else {
- if muted {
+ if live_kit.muted_by_user || live_kit.deafened {
cx.background_executor()
- .spawn(publication.set_mute(muted))
+ .spawn(publication.set_mute(true))
.detach();
}
live_kit.microphone_track = LocalTrack::Published {
track_publication: publication,
- muted,
};
cx.notify();
}
@@ -1420,10 +1419,7 @@ impl Room {
let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
let publish_id = post_inc(&mut live_kit.next_publish_id);
- live_kit.screen_track = LocalTrack::Pending {
- publish_id,
- muted: false,
- };
+ live_kit.screen_track = LocalTrack::Pending { publish_id };
cx.notify();
(live_kit.room.display_sources(), publish_id)
} else {
@@ -1457,14 +1453,13 @@ impl Room {
.as_mut()
.ok_or_else(|| anyhow!("live-kit was not initialized"))?;
- let (canceled, muted) = if let LocalTrack::Pending {
+ let canceled = if let LocalTrack::Pending {
publish_id: cur_publish_id,
- muted,
} = &live_kit.screen_track
{
- (*cur_publish_id != publish_id, *muted)
+ *cur_publish_id != publish_id
} else {
- (true, false)
+ true
};
match publication {
@@ -1472,14 +1467,8 @@ impl Room {
if canceled {
live_kit.room.unpublish_track(publication);
} else {
- if muted {
- cx.background_executor()
- .spawn(publication.set_mute(muted))
- .detach();
- }
live_kit.screen_track = LocalTrack::Published {
track_publication: publication,
- muted,
};
cx.notify();
}
@@ -1502,61 +1491,51 @@ impl Room {
})
}
- pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
- let should_mute = !self.is_muted(cx);
+ pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) {
if let Some(live_kit) = self.live_kit.as_mut() {
- if matches!(live_kit.microphone_track, LocalTrack::None) {
- return Ok(self.share_microphone(cx));
+ // When unmuting, undeafen if the user was deafened before.
+ let was_deafened = live_kit.deafened;
+ if live_kit.muted_by_user
+ || live_kit.deafened
+ || matches!(live_kit.microphone_track, LocalTrack::None)
+ {
+ live_kit.muted_by_user = false;
+ live_kit.deafened = false;
+ } else {
+ live_kit.muted_by_user = true;
}
+ let muted = live_kit.muted_by_user;
+ let should_undeafen = was_deafened && !live_kit.deafened;
- let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
- live_kit.muted_by_user = should_mute;
+ if let Some(task) = self.set_mute(muted, cx) {
+ task.detach_and_log_err(cx);
+ }
- if old_muted == true && live_kit.deafened == true {
- if let Some(task) = self.toggle_deafen(cx).ok() {
- task.detach();
+ if should_undeafen {
+ if let Some(task) = self.set_deafened(false, cx) {
+ task.detach_and_log_err(cx);
}
}
-
- Ok(ret_task)
- } else {
- Err(anyhow!("LiveKit not started"))
}
}
- pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
+ pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) {
if let Some(live_kit) = self.live_kit.as_mut() {
- (*live_kit).deafened = !live_kit.deafened;
-
- let mut tasks = Vec::with_capacity(self.remote_participants.len());
- // Context notification is sent within set_mute itself.
- let mut mute_task = None;
- // When deafening, mute user's mic as well.
- // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
- if live_kit.deafened || !live_kit.muted_by_user {
- mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
- };
- for participant in self.remote_participants.values() {
- for track in live_kit
- .room
- .remote_audio_track_publications(&participant.user.id.to_string())
- {
- let deafened = live_kit.deafened;
- tasks.push(cx.foreground_executor().spawn(track.set_enabled(!deafened)));
- }
+ // When deafening, mute the microphone if it was not already muted.
+ // When un-deafening, unmute the microphone, unless it was explicitly muted.
+ let deafened = !live_kit.deafened;
+ live_kit.deafened = deafened;
+ let should_change_mute = !live_kit.muted_by_user;
+
+ if let Some(task) = self.set_deafened(deafened, cx) {
+ task.detach_and_log_err(cx);
}
- Ok(cx.foreground_executor().spawn(async move {
- if let Some(mute_task) = mute_task {
- mute_task.await?;
+ if should_change_mute {
+ if let Some(task) = self.set_mute(deafened, cx) {
+ task.detach_and_log_err(cx);
}
- for task in tasks {
- task.await?;
- }
- Ok(())
- }))
- } else {
- Err(anyhow!("LiveKit not started"))
+ }
}
}
@@ -1587,6 +1566,70 @@ impl Room {
}
}
+ fn set_deafened(
+ &mut self,
+ deafened: bool,
+ cx: &mut ModelContext<Self>,
+ ) -> Option<Task<Result<()>>> {
+ let live_kit = self.live_kit.as_mut()?;
+ cx.notify();
+
+ let mut track_updates = Vec::new();
+ for participant in self.remote_participants.values() {
+ for publication in live_kit
+ .room
+ .remote_audio_track_publications(&participant.user.id.to_string())
+ {
+ track_updates.push(publication.set_enabled(!deafened));
+ }
+
+ for track in participant.audio_tracks.values() {
+ if deafened {
+ track.stop();
+ } else {
+ track.start();
+ }
+ }
+ }
+
+ Some(cx.foreground_executor().spawn(async move {
+ for result in futures::future::join_all(track_updates).await {
+ result?;
+ }
+ Ok(())
+ }))
+ }
+
+ fn set_mute(
+ &mut self,
+ should_mute: bool,
+ cx: &mut ModelContext<Room>,
+ ) -> Option<Task<Result<()>>> {
+ let live_kit = self.live_kit.as_mut()?;
+ cx.notify();
+
+ if should_mute {
+ Audio::play_sound(Sound::Mute, cx);
+ } else {
+ Audio::play_sound(Sound::Unmute, cx);
+ }
+
+ match &mut live_kit.microphone_track {
+ LocalTrack::None => {
+ if should_mute {
+ None
+ } else {
+ Some(self.share_microphone(cx))
+ }
+ }
+ LocalTrack::Pending { .. } => None,
+ LocalTrack::Published { track_publication } => Some(
+ cx.foreground_executor()
+ .spawn(track_publication.set_mute(should_mute)),
+ ),
+ }
+ }
+
#[cfg(any(test, feature = "test-support"))]
pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
self.live_kit
@@ -1611,50 +1654,6 @@ struct LiveKitRoom {
}
impl LiveKitRoom {
- fn set_mute(
- self: &mut LiveKitRoom,
- should_mute: bool,
- cx: &mut ModelContext<Room>,
- ) -> Result<(Task<Result<()>>, bool)> {
- if !should_mute {
- // clear user muting state.
- self.muted_by_user = false;
- }
-
- let (result, old_muted) = match &mut self.microphone_track {
- LocalTrack::None => Err(anyhow!("microphone was not shared")),
- LocalTrack::Pending { muted, .. } => {
- let old_muted = *muted;
- *muted = should_mute;
- cx.notify();
- Ok((Task::Ready(Some(Ok(()))), old_muted))
- }
- LocalTrack::Published {
- track_publication,
- muted,
- } => {
- let old_muted = *muted;
- *muted = should_mute;
- cx.notify();
- Ok((
- cx.background_executor()
- .spawn(track_publication.set_mute(*muted)),
- old_muted,
- ))
- }
- }?;
-
- if old_muted != should_mute {
- if should_mute {
- Audio::play_sound(Sound::Mute, cx);
- } else {
- Audio::play_sound(Sound::Unmute, cx);
- }
- }
-
- Ok((result, old_muted))
- }
-
fn stop_publishing(&mut self, cx: &mut ModelContext<Room>) {
if let LocalTrack::Published {
track_publication, ..
@@ -1678,11 +1677,9 @@ enum LocalTrack {
None,
Pending {
publish_id: usize,
- muted: bool,
},
Published {
track_publication: LocalTrackPublication,
- muted: bool,
},
}
@@ -57,7 +57,7 @@ async fn test_channel_guests(
})
.await
.is_err());
- assert!(room_b.read_with(cx_b, |room, _| !room.is_sharing_mic()));
+ assert!(room_b.read_with(cx_b, |room, _| room.is_muted()));
}
#[gpui::test]
@@ -104,6 +104,7 @@ async fn test_channel_guest_promotion(cx_a: &mut TestAppContext, cx_b: &mut Test
});
assert!(project_b.read_with(cx_b, |project, _| project.is_read_only()));
assert!(editor_b.update(cx_b, |e, cx| e.read_only(cx)));
+ assert!(room_b.read_with(cx_b, |room, _| room.read_only()));
assert!(room_b
.update(cx_b, |room, cx| room.share_microphone(cx))
.await
@@ -127,10 +128,13 @@ async fn test_channel_guest_promotion(cx_a: &mut TestAppContext, cx_b: &mut Test
// project and buffers are now editable
assert!(project_b.read_with(cx_b, |project, _| !project.is_read_only()));
assert!(editor_b.update(cx_b, |editor, cx| !editor.read_only(cx)));
- room_b
- .update(cx_b, |room, cx| room.share_microphone(cx))
- .await
- .unwrap();
+
+ // B sees themselves as muted, and can unmute.
+ assert!(room_b.read_with(cx_b, |room, _| !room.read_only()));
+ room_b.read_with(cx_b, |room, _| assert!(room.is_muted()));
+ room_b.update(cx_b, |room, cx| room.toggle_mute(cx));
+ cx_a.run_until_parked();
+ room_b.read_with(cx_b, |room, _| assert!(!room.is_muted()));
// B is demoted
active_call_a
@@ -1876,6 +1876,186 @@ fn active_call_events(cx: &mut TestAppContext) -> Rc<RefCell<Vec<room::Event>>>
events
}
+#[gpui::test]
+async fn test_mute_deafen(
+ executor: BackgroundExecutor,
+ cx_a: &mut TestAppContext,
+ cx_b: &mut TestAppContext,
+ cx_c: &mut TestAppContext,
+) {
+ let mut server = TestServer::start(executor.clone()).await;
+ let client_a = server.create_client(cx_a, "user_a").await;
+ let client_b = server.create_client(cx_b, "user_b").await;
+ let client_c = server.create_client(cx_c, "user_c").await;
+
+ server
+ .make_contacts(&mut [(&client_a, cx_a), (&client_b, cx_b), (&client_c, cx_c)])
+ .await;
+
+ let active_call_a = cx_a.read(ActiveCall::global);
+ let active_call_b = cx_b.read(ActiveCall::global);
+ let active_call_c = cx_c.read(ActiveCall::global);
+
+ // User A calls user B, B answers.
+ active_call_a
+ .update(cx_a, |call, cx| {
+ call.invite(client_b.user_id().unwrap(), None, cx)
+ })
+ .await
+ .unwrap();
+ executor.run_until_parked();
+ active_call_b
+ .update(cx_b, |call, cx| call.accept_incoming(cx))
+ .await
+ .unwrap();
+ executor.run_until_parked();
+
+ let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone());
+ let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone());
+
+ room_a.read_with(cx_a, |room, _| assert!(!room.is_muted()));
+ room_b.read_with(cx_b, |room, _| assert!(!room.is_muted()));
+
+ // Users A and B are both muted.
+ assert_eq!(
+ participant_audio_state(&room_a, cx_a),
+ &[ParticipantAudioState {
+ user_id: client_b.user_id().unwrap(),
+ is_muted: false,
+ audio_tracks_playing: vec![true],
+ }]
+ );
+ assert_eq!(
+ participant_audio_state(&room_b, cx_b),
+ &[ParticipantAudioState {
+ user_id: client_a.user_id().unwrap(),
+ is_muted: false,
+ audio_tracks_playing: vec![true],
+ }]
+ );
+
+ // User A mutes
+ room_a.update(cx_a, |room, cx| room.toggle_mute(cx));
+ executor.run_until_parked();
+
+ // User A hears user B, but B doesn't hear A.
+ room_a.read_with(cx_a, |room, _| assert!(room.is_muted()));
+ room_b.read_with(cx_b, |room, _| assert!(!room.is_muted()));
+ assert_eq!(
+ participant_audio_state(&room_a, cx_a),
+ &[ParticipantAudioState {
+ user_id: client_b.user_id().unwrap(),
+ is_muted: false,
+ audio_tracks_playing: vec![true],
+ }]
+ );
+ assert_eq!(
+ participant_audio_state(&room_b, cx_b),
+ &[ParticipantAudioState {
+ user_id: client_a.user_id().unwrap(),
+ is_muted: true,
+ audio_tracks_playing: vec![true],
+ }]
+ );
+
+ // User A deafens
+ room_a.update(cx_a, |room, cx| room.toggle_deafen(cx));
+ executor.run_until_parked();
+
+ // User A does not hear user B.
+ room_a.read_with(cx_a, |room, _| assert!(room.is_muted()));
+ room_b.read_with(cx_b, |room, _| assert!(!room.is_muted()));
+ assert_eq!(
+ participant_audio_state(&room_a, cx_a),
+ &[ParticipantAudioState {
+ user_id: client_b.user_id().unwrap(),
+ is_muted: false,
+ audio_tracks_playing: vec![false],
+ }]
+ );
+ assert_eq!(
+ participant_audio_state(&room_b, cx_b),
+ &[ParticipantAudioState {
+ user_id: client_a.user_id().unwrap(),
+ is_muted: true,
+ audio_tracks_playing: vec![true],
+ }]
+ );
+
+ // User B calls user C, C joins.
+ active_call_b
+ .update(cx_b, |call, cx| {
+ call.invite(client_c.user_id().unwrap(), None, cx)
+ })
+ .await
+ .unwrap();
+ executor.run_until_parked();
+ active_call_c
+ .update(cx_c, |call, cx| call.accept_incoming(cx))
+ .await
+ .unwrap();
+ executor.run_until_parked();
+
+ // User A does not hear users B or C.
+ assert_eq!(
+ participant_audio_state(&room_a, cx_a),
+ &[
+ ParticipantAudioState {
+ user_id: client_b.user_id().unwrap(),
+ is_muted: false,
+ audio_tracks_playing: vec![false],
+ },
+ ParticipantAudioState {
+ user_id: client_c.user_id().unwrap(),
+ is_muted: false,
+ audio_tracks_playing: vec![false],
+ }
+ ]
+ );
+ assert_eq!(
+ participant_audio_state(&room_b, cx_b),
+ &[
+ ParticipantAudioState {
+ user_id: client_a.user_id().unwrap(),
+ is_muted: true,
+ audio_tracks_playing: vec![true],
+ },
+ ParticipantAudioState {
+ user_id: client_c.user_id().unwrap(),
+ is_muted: false,
+ audio_tracks_playing: vec![true],
+ }
+ ]
+ );
+
+ #[derive(PartialEq, Eq, Debug)]
+ struct ParticipantAudioState {
+ user_id: u64,
+ is_muted: bool,
+ audio_tracks_playing: Vec<bool>,
+ }
+
+ fn participant_audio_state(
+ room: &Model<Room>,
+ cx: &TestAppContext,
+ ) -> Vec<ParticipantAudioState> {
+ room.read_with(cx, |room, _| {
+ room.remote_participants()
+ .iter()
+ .map(|(user_id, participant)| ParticipantAudioState {
+ user_id: *user_id,
+ is_muted: participant.muted,
+ audio_tracks_playing: participant
+ .audio_tracks
+ .values()
+ .map(|track| track.is_playing())
+ .collect(),
+ })
+ .collect::<Vec<_>>()
+ })
+ }
+}
+
#[gpui::test(iterations = 10)]
async fn test_room_location(
executor: BackgroundExecutor,
@@ -248,7 +248,6 @@ impl TestServer {
language::init(cx);
editor::init(cx);
workspace::init(app_state.clone(), cx);
- audio::init((), cx);
call::init(client.clone(), user_store.clone(), cx);
channel::init(&client, user_store.clone(), cx);
notifications::init(client.clone(), user_store, cx);
@@ -102,7 +102,7 @@ impl Render for CollabTitlebarItem {
peer_id,
true,
room.is_speaking(),
- room.is_muted(cx),
+ room.is_muted(),
&room,
project_id,
¤t_user,
@@ -168,7 +168,7 @@ impl Render for CollabTitlebarItem {
let project = self.project.read(cx);
let is_local = project.is_local();
let is_shared = is_local && project.is_shared();
- let is_muted = room.is_muted(cx);
+ let is_muted = room.is_muted();
let is_deafened = room.is_deafened().unwrap_or(false);
let is_screen_sharing = room.is_screen_sharing();
let read_only = room.read_only();
@@ -9,7 +9,7 @@ mod panel_settings;
use std::{rc::Rc, sync::Arc};
-use call::{report_call_event_for_room, ActiveCall, Room};
+use call::{report_call_event_for_room, ActiveCall};
pub use collab_panel::CollabPanel;
pub use collab_titlebar_item::CollabTitlebarItem;
use feature_flags::{ChannelsAlpha, FeatureFlagAppExt};
@@ -21,7 +21,6 @@ pub use panel_settings::{
ChatPanelSettings, CollaborationPanelSettings, NotificationPanelSettings,
};
use settings::Settings;
-use util::ResultExt;
use workspace::AppState;
actions!(
@@ -75,7 +74,7 @@ pub fn toggle_mute(_: &ToggleMute, cx: &mut AppContext) {
if let Some(room) = call.room().cloned() {
let client = call.client();
room.update(cx, |room, cx| {
- let operation = if room.is_muted(cx) {
+ let operation = if room.is_muted() {
"enable microphone"
} else {
"disable microphone"
@@ -83,17 +82,13 @@ pub fn toggle_mute(_: &ToggleMute, cx: &mut AppContext) {
report_call_event_for_room(operation, room.id(), room.channel_id(), &client);
room.toggle_mute(cx)
- })
- .map(|task| task.detach_and_log_err(cx))
- .log_err();
+ });
}
}
pub fn toggle_deafen(_: &ToggleDeafen, cx: &mut AppContext) {
if let Some(room) = ActiveCall::global(cx).read(cx).room().cloned() {
- room.update(cx, Room::toggle_deafen)
- .map(|task| task.detach_and_log_err(cx))
- .log_err();
+ room.update(cx, |room, cx| room.toggle_deafen(cx));
}
}
@@ -286,6 +286,18 @@ public func LKRemoteAudioTrackGetSid(track: UnsafeRawPointer) -> CFString {
return track.sid! as CFString
}
+@_cdecl("LKRemoteAudioTrackStart")
+public func LKRemoteAudioTrackStart(track: UnsafeRawPointer) {
+ let track = Unmanaged<RemoteAudioTrack>.fromOpaque(track).takeUnretainedValue()
+ track.start()
+}
+
+@_cdecl("LKRemoteAudioTrackStop")
+public func LKRemoteAudioTrackStop(track: UnsafeRawPointer) {
+ let track = Unmanaged<RemoteAudioTrack>.fromOpaque(track).takeUnretainedValue()
+ track.stop()
+}
+
@_cdecl("LKDisplaySources")
public func LKDisplaySources(data: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, CFArray?, CFString?) -> Void) {
MacOSScreenCapturer.sources(for: .display, includeCurrentApplication: false, preferredMethod: .legacy).then { displaySources in
@@ -18,8 +18,6 @@ use std::{
sync::{Arc, Weak},
};
-// SAFETY: Most live kit types are threadsafe:
-// https://github.com/livekit/client-sdk-swift#thread-safety
macro_rules! pointer_type {
($pointer_name:ident) => {
#[repr(transparent)]
@@ -134,8 +132,10 @@ extern "C" {
) -> *const c_void;
fn LKRemoteAudioTrackGetSid(track: swift::RemoteAudioTrack) -> CFStringRef;
- fn LKVideoTrackAddRenderer(track: swift::RemoteVideoTrack, renderer: *const c_void);
fn LKRemoteVideoTrackGetSid(track: swift::RemoteVideoTrack) -> CFStringRef;
+ fn LKRemoteAudioTrackStart(track: swift::RemoteAudioTrack);
+ fn LKRemoteAudioTrackStop(track: swift::RemoteAudioTrack);
+ fn LKVideoTrackAddRenderer(track: swift::RemoteVideoTrack, renderer: *const c_void);
fn LKDisplaySources(
callback_data: *mut c_void,
@@ -853,12 +853,12 @@ impl RemoteAudioTrack {
&self.publisher_id
}
- pub fn enable(&self) -> impl Future<Output = Result<()>> {
- async { Ok(()) }
+ pub fn start(&self) {
+ unsafe { LKRemoteAudioTrackStart(self.native_track) }
}
- pub fn disable(&self) -> impl Future<Output = Result<()>> {
- async { Ok(()) }
+ pub fn stop(&self) {
+ unsafe { LKRemoteAudioTrackStop(self.native_track) }
}
}
@@ -1,7 +1,7 @@
use crate::{ConnectionState, RoomUpdate, Sid};
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
-use collections::{BTreeMap, HashMap};
+use collections::{BTreeMap, HashMap, HashSet};
use futures::Stream;
use gpui::BackgroundExecutor;
use live_kit_server::{proto, token};
@@ -13,7 +13,7 @@ use std::{
mem,
sync::{
atomic::{AtomicBool, Ordering::SeqCst},
- Arc,
+ Arc, Weak,
},
};
@@ -113,7 +113,25 @@ impl TestServer {
.0
.lock()
.updates_tx
- .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone()))
+ .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new(
+ RemoteVideoTrack {
+ server_track: track.clone(),
+ },
+ )))
+ .unwrap();
+ }
+ for track in &room.audio_tracks {
+ client_room
+ .0
+ .lock()
+ .updates_tx
+ .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
+ Arc::new(RemoteAudioTrack {
+ server_track: track.clone(),
+ room: Arc::downgrade(&client_room),
+ }),
+ Arc::new(RemoteTrackPublication),
+ ))
.unwrap();
}
room.client_rooms.insert(identity, client_room);
@@ -210,7 +228,7 @@ impl TestServer {
}
let sid = nanoid::nanoid!(17);
- let track = Arc::new(RemoteVideoTrack {
+ let track = Arc::new(TestServerVideoTrack {
sid: sid.clone(),
publisher_id: identity.clone(),
frames_rx: local_track.frames_rx.clone(),
@@ -224,7 +242,11 @@ impl TestServer {
.0
.lock()
.updates_tx
- .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone()))
+ .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new(
+ RemoteVideoTrack {
+ server_track: track.clone(),
+ },
+ )))
.unwrap();
}
}
@@ -259,9 +281,10 @@ impl TestServer {
}
let sid = nanoid::nanoid!(17);
- let track = Arc::new(RemoteAudioTrack {
+ let track = Arc::new(TestServerAudioTrack {
sid: sid.clone(),
publisher_id: identity.clone(),
+ muted: AtomicBool::new(false),
});
let publication = Arc::new(RemoteTrackPublication);
@@ -275,7 +298,10 @@ impl TestServer {
.lock()
.updates_tx
.try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
- track.clone(),
+ Arc::new(RemoteAudioTrack {
+ server_track: track.clone(),
+ room: Arc::downgrade(&client_room),
+ }),
publication.clone(),
))
.unwrap();
@@ -285,37 +311,123 @@ impl TestServer {
Ok(sid)
}
+ fn set_track_muted(&self, token: &str, track_sid: &str, muted: bool) -> Result<()> {
+ let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
+ let room_name = claims.video.room.unwrap();
+ let identity = claims.sub.unwrap();
+ let mut server_rooms = self.rooms.lock();
+ let room = server_rooms
+ .get_mut(&*room_name)
+ .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
+ if let Some(track) = room
+ .audio_tracks
+ .iter_mut()
+ .find(|track| track.sid == track_sid)
+ {
+ track.muted.store(muted, SeqCst);
+ for (id, client_room) in room.client_rooms.iter() {
+ if *id != identity {
+ client_room
+ .0
+ .lock()
+ .updates_tx
+ .try_broadcast(RoomUpdate::RemoteAudioTrackMuteChanged {
+ track_id: track_sid.to_string(),
+ muted,
+ })
+ .unwrap();
+ }
+ }
+ }
+ Ok(())
+ }
+
+ fn is_track_muted(&self, token: &str, track_sid: &str) -> Option<bool> {
+ let claims = live_kit_server::token::validate(&token, &self.secret_key).ok()?;
+ let room_name = claims.video.room.unwrap();
+
+ let mut server_rooms = self.rooms.lock();
+ let room = server_rooms.get_mut(&*room_name)?;
+ room.audio_tracks.iter().find_map(|track| {
+ if track.sid == track_sid {
+ Some(track.muted.load(SeqCst))
+ } else {
+ None
+ }
+ })
+ }
+
fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
let room_name = claims.video.room.unwrap();
+ let identity = claims.sub.unwrap();
let mut server_rooms = self.rooms.lock();
let room = server_rooms
.get_mut(&*room_name)
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
- Ok(room.video_tracks.clone())
+ room.client_rooms
+ .get(identity.as_ref())
+ .ok_or_else(|| anyhow!("not a participant in room"))?;
+ Ok(room
+ .video_tracks
+ .iter()
+ .map(|track| {
+ Arc::new(RemoteVideoTrack {
+ server_track: track.clone(),
+ })
+ })
+ .collect())
}
fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
let room_name = claims.video.room.unwrap();
+ let identity = claims.sub.unwrap();
let mut server_rooms = self.rooms.lock();
let room = server_rooms
.get_mut(&*room_name)
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
- Ok(room.audio_tracks.clone())
+ let client_room = room
+ .client_rooms
+ .get(identity.as_ref())
+ .ok_or_else(|| anyhow!("not a participant in room"))?;
+ Ok(room
+ .audio_tracks
+ .iter()
+ .map(|track| {
+ Arc::new(RemoteAudioTrack {
+ server_track: track.clone(),
+ room: Arc::downgrade(&client_room),
+ })
+ })
+ .collect())
}
}
#[derive(Default)]
struct TestServerRoom {
client_rooms: HashMap<Sid, Arc<Room>>,
- video_tracks: Vec<Arc<RemoteVideoTrack>>,
- audio_tracks: Vec<Arc<RemoteAudioTrack>>,
+ video_tracks: Vec<Arc<TestServerVideoTrack>>,
+ audio_tracks: Vec<Arc<TestServerAudioTrack>>,
participant_permissions: HashMap<Sid, proto::ParticipantPermission>,
}
+#[derive(Debug)]
+struct TestServerVideoTrack {
+ sid: Sid,
+ publisher_id: Sid,
+ frames_rx: async_broadcast::Receiver<Frame>,
+}
+
+#[derive(Debug)]
+struct TestServerAudioTrack {
+ sid: Sid,
+ publisher_id: Sid,
+ muted: AtomicBool,
+}
+
impl TestServerRoom {}
pub struct TestApiClient {
@@ -386,6 +498,7 @@ struct RoomState {
watch::Receiver<ConnectionState>,
),
display_sources: Vec<MacOSDisplay>,
+ paused_audio_tracks: HashSet<Sid>,
updates_tx: async_broadcast::Sender<RoomUpdate>,
updates_rx: async_broadcast::Receiver<RoomUpdate>,
}
@@ -398,6 +511,7 @@ impl Room {
Arc::new(Self(Mutex::new(RoomState {
connection: watch::channel_with(ConnectionState::Disconnected),
display_sources: Default::default(),
+ paused_audio_tracks: Default::default(),
updates_tx,
updates_rx,
})))
@@ -443,11 +557,12 @@ impl Room {
.publish_video_track(this.token(), track)
.await?;
Ok(LocalTrackPublication {
- muted: Default::default(),
+ room: Arc::downgrade(&this),
sid,
})
}
}
+
pub fn publish_audio_track(
self: &Arc<Self>,
track: LocalAudioTrack,
@@ -460,7 +575,7 @@ impl Room {
.publish_audio_track(this.token(), &track)
.await?;
Ok(LocalTrackPublication {
- muted: Default::default(),
+ room: Arc::downgrade(&this),
sid,
})
}
@@ -560,20 +675,31 @@ impl Drop for Room {
#[derive(Clone)]
pub struct LocalTrackPublication {
sid: String,
- muted: Arc<AtomicBool>,
+ room: Weak<Room>,
}
impl LocalTrackPublication {
pub fn set_mute(&self, mute: bool) -> impl Future<Output = Result<()>> {
- let muted = self.muted.clone();
+ let sid = self.sid.clone();
+ let room = self.room.clone();
async move {
- muted.store(mute, SeqCst);
- Ok(())
+ if let Some(room) = room.upgrade() {
+ room.test_server()
+ .set_track_muted(&room.token(), &sid, mute)
+ } else {
+ Err(anyhow!("no such room"))
+ }
}
}
pub fn is_muted(&self) -> bool {
- self.muted.load(SeqCst)
+ if let Some(room) = self.room.upgrade() {
+ room.test_server()
+ .is_track_muted(&room.token(), &self.sid)
+ .unwrap_or(false)
+ } else {
+ false
+ }
}
pub fn sid(&self) -> String {
@@ -621,46 +747,65 @@ impl LocalAudioTrack {
#[derive(Debug)]
pub struct RemoteVideoTrack {
- sid: Sid,
- publisher_id: Sid,
- frames_rx: async_broadcast::Receiver<Frame>,
+ server_track: Arc<TestServerVideoTrack>,
}
impl RemoteVideoTrack {
pub fn sid(&self) -> &str {
- &self.sid
+ &self.server_track.sid
}
pub fn publisher_id(&self) -> &str {
- &self.publisher_id
+ &self.server_track.publisher_id
}
pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
- self.frames_rx.clone()
+ self.server_track.frames_rx.clone()
}
}
#[derive(Debug)]
pub struct RemoteAudioTrack {
- sid: Sid,
- publisher_id: Sid,
+ server_track: Arc<TestServerAudioTrack>,
+ room: Weak<Room>,
}
impl RemoteAudioTrack {
pub fn sid(&self) -> &str {
- &self.sid
+ &self.server_track.sid
}
pub fn publisher_id(&self) -> &str {
- &self.publisher_id
+ &self.server_track.publisher_id
}
- pub fn enable(&self) -> impl Future<Output = Result<()>> {
- async { Ok(()) }
+ pub fn start(&self) {
+ if let Some(room) = self.room.upgrade() {
+ room.0
+ .lock()
+ .paused_audio_tracks
+ .remove(&self.server_track.sid);
+ }
}
- pub fn disable(&self) -> impl Future<Output = Result<()>> {
- async { Ok(()) }
+ pub fn stop(&self) {
+ if let Some(room) = self.room.upgrade() {
+ room.0
+ .lock()
+ .paused_audio_tracks
+ .insert(self.server_track.sid.clone());
+ }
+ }
+
+ pub fn is_playing(&self) -> bool {
+ !self
+ .room
+ .upgrade()
+ .unwrap()
+ .0
+ .lock()
+ .paused_audio_tracks
+ .contains(&self.server_track.sid)
}
}