diff --git a/crates/call/src/participant.rs b/crates/call/src/participant.rs index 4b9e7ba034fb23d118cb5963d335e5c16201d590..90f256489d2d40f39ad1d49f6266980cc13951bb 100644 --- a/crates/call/src/participant.rs +++ b/crates/call/src/participant.rs @@ -3,6 +3,7 @@ use client::{proto, User}; use collections::HashMap; use gpui::WeakModelHandle; pub use live_kit_client::Frame; +use live_kit_client::RemoteAudioTrack; use project::Project; use std::{fmt, sync::Arc}; @@ -42,7 +43,8 @@ pub struct RemoteParticipant { pub peer_id: proto::PeerId, pub projects: Vec, pub location: ParticipantLocation, - pub tracks: HashMap>, + pub video_tracks: HashMap>, + pub audio_tracks: HashMap>, } #[derive(Clone)] diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 775342359f4af8c68ea1caa1bf25affb31c4ba8d..257a612f08162e70c0f5ec752758f27fa969362f 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -12,7 +12,9 @@ use fs::Fs; use futures::{FutureExt, StreamExt}; use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle}; use language::LanguageRegistry; -use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate}; +use live_kit_client::{ + LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate, RemoteAudioTrackUpdate, +}; use postage::stream::Stream; use project::Project; use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration}; @@ -28,6 +30,9 @@ pub enum Event { RemoteVideoTracksChanged { participant_id: proto::PeerId, }, + RemoteAudioTracksChanged { + participant_id: proto::PeerId, + }, RemoteProjectShared { owner: Arc, project_id: u64, @@ -112,9 +117,9 @@ impl Room { } }); - let mut track_changes = room.remote_video_track_updates(); - let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move { - while let Some(track_change) = track_changes.next().await { + let mut track_video_changes = room.remote_video_track_updates(); + let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move { + while let Some(track_change) = track_video_changes.next().await { let this = if let Some(this) = this.upgrade(&cx) { this } else { @@ -127,16 +132,32 @@ impl Room { } }); + let mut track_audio_changes = room.remote_audio_track_updates(); + let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move { + while let Some(track_change) = track_audio_changes.next().await { + let this = if let Some(this) = this.upgrade(&cx) { + this + } else { + break; + }; + + this.update(&mut cx, |this, cx| { + this.remote_audio_track_updated(track_change, cx).log_err() + }); + } + }); + cx.foreground() .spawn(room.connect(&connection_info.server_url, &connection_info.token)) .detach_and_log_err(cx); Some(LiveKitRoom { room, - screen_track: ScreenTrack::None, + screen_track: Track::None, + microphone_track: Track::None, next_publish_id: 0, _maintain_room, - _maintain_tracks, + _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks], }) } else { None @@ -197,6 +218,12 @@ impl Room { None }; + let share_mic = room.update(&mut cx, |room, cx| { + room.share_mic(cx) + }); + + cx.background().spawn(share_mic).detach(); + match room .update(&mut cx, |room, cx| { room.leave_when_empty = true; @@ -618,20 +645,26 @@ impl Room { peer_id, projects: participant.projects, location, - tracks: Default::default(), + video_tracks: Default::default(), + audio_tracks: Default::default(), }, ); if let Some(live_kit) = this.live_kit.as_ref() { - let tracks = + let video_tracks = live_kit.room.remote_video_tracks(&user.id.to_string()); - for track in tracks { + let audio_tracks = live_kit.room.remote_audio_tracks(&user.id.to_string()); + for track in video_tracks { this.remote_video_track_updated( RemoteVideoTrackUpdate::Subscribed(track), cx, ) .log_err(); } + for track in audio_tracks { + this.remote_audio_track_updated(RemoteAudioTrackUpdate::Subscribed(track), cx) + .log_err(); + } } } } @@ -706,7 +739,7 @@ impl Room { .remote_participants .get_mut(&user_id) .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?; - participant.tracks.insert( + participant.video_tracks.insert( track_id.clone(), Arc::new(RemoteVideoTrack { live_kit_track: track, @@ -725,7 +758,7 @@ impl Room { .remote_participants .get_mut(&user_id) .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?; - participant.tracks.remove(&track_id); + participant.video_tracks.remove(&track_id); cx.emit(Event::RemoteVideoTracksChanged { participant_id: participant.peer_id, }); @@ -736,6 +769,47 @@ impl Room { Ok(()) } + fn remote_audio_track_updated( + &mut self, + change: RemoteAudioTrackUpdate, + cx: &mut ModelContext, + ) -> Result<()> { + match change { + RemoteAudioTrackUpdate::Subscribed(track) => { + let user_id = track.publisher_id().parse()?; + let track_id = track.sid().to_string(); + let participant = self + .remote_participants + .get_mut(&user_id) + .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?; + participant.audio_tracks.insert( + track_id.clone(), + track, + ); + cx.emit(Event::RemoteAudioTracksChanged { + participant_id: participant.peer_id, + }); + } + RemoteAudioTrackUpdate::Unsubscribed { + publisher_id, + track_id, + } => { + let user_id = publisher_id.parse()?; + let participant = self + .remote_participants + .get_mut(&user_id) + .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?; + participant.audio_tracks.remove(&track_id); + cx.emit(Event::RemoteAudioTracksChanged { + participant_id: participant.peer_id, + }); + } + } + + cx.notify(); + Ok(()) + } + fn check_invariants(&self) { #[cfg(any(test, feature = "test-support"))] { @@ -908,7 +982,85 @@ impl Room { pub fn is_screen_sharing(&self) -> bool { self.live_kit.as_ref().map_or(false, |live_kit| { - !matches!(live_kit.screen_track, ScreenTrack::None) + !matches!(live_kit.screen_track, Track::None) + }) + } + + pub fn is_sharing_mic(&self) -> bool { + self.live_kit.as_ref().map_or(false, |live_kit| { + !matches!(live_kit.microphone_track, Track::None) + }) + } + + pub fn share_mic(&mut self, cx: &mut ModelContext) -> Task> { + if self.status.is_offline() { + return Task::ready(Err(anyhow!("room is offline"))); + } else if self.is_sharing_mic() { + return Task::ready(Err(anyhow!("microphone was already shared"))); + } + + let publish_id = if let Some(live_kit) = self.live_kit.as_mut() { + let publish_id = post_inc(&mut live_kit.next_publish_id); + live_kit.microphone_track = Track::Pending { publish_id }; + cx.notify(); + publish_id + } else { + return Task::ready(Err(anyhow!("live-kit was not initialized"))); + }; + + cx.spawn_weak(|this, mut cx| async move { + let publish_track = async { + let track = LocalAudioTrack::create(); + this.upgrade(&cx) + .ok_or_else(|| anyhow!("room was dropped"))? + .read_with(&cx, |this, _| { + this.live_kit + .as_ref() + .map(|live_kit| live_kit.room.publish_audio_track(&track)) + }) + .ok_or_else(|| anyhow!("live-kit was not initialized"))? + .await + }; + + let publication = publish_track.await; + this.upgrade(&cx) + .ok_or_else(|| anyhow!("room was dropped"))? + .update(&mut cx, |this, cx| { + let live_kit = this + .live_kit + .as_mut() + .ok_or_else(|| anyhow!("live-kit was not initialized"))?; + + let canceled = if let Track::Pending { + publish_id: cur_publish_id, + } = &live_kit.microphone_track + { + *cur_publish_id != publish_id + } else { + true + }; + + match publication { + Ok(publication) => { + if canceled { + live_kit.room.unpublish_track(publication); + } else { + live_kit.microphone_track = Track::Published(publication); + cx.notify(); + } + Ok(()) + } + Err(error) => { + if canceled { + Ok(()) + } else { + live_kit.microphone_track = Track::None; + cx.notify(); + Err(error) + } + } + } + }) }) } @@ -921,7 +1073,7 @@ impl Room { let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() { let publish_id = post_inc(&mut live_kit.next_publish_id); - live_kit.screen_track = ScreenTrack::Pending { publish_id }; + live_kit.screen_track = Track::Pending { publish_id }; cx.notify(); (live_kit.room.display_sources(), publish_id) } else { @@ -955,7 +1107,7 @@ impl Room { .as_mut() .ok_or_else(|| anyhow!("live-kit was not initialized"))?; - let canceled = if let ScreenTrack::Pending { + let canceled = if let Track::Pending { publish_id: cur_publish_id, } = &live_kit.screen_track { @@ -969,7 +1121,7 @@ impl Room { if canceled { live_kit.room.unpublish_track(publication); } else { - live_kit.screen_track = ScreenTrack::Published(publication); + live_kit.screen_track = Track::Published(publication); cx.notify(); } Ok(()) @@ -978,7 +1130,7 @@ impl Room { if canceled { Ok(()) } else { - live_kit.screen_track = ScreenTrack::None; + live_kit.screen_track = Track::None; cx.notify(); Err(error) } @@ -998,12 +1150,12 @@ impl Room { .as_mut() .ok_or_else(|| anyhow!("live-kit was not initialized"))?; match mem::take(&mut live_kit.screen_track) { - ScreenTrack::None => Err(anyhow!("screen was not shared")), - ScreenTrack::Pending { .. } => { + Track::None => Err(anyhow!("screen was not shared")), + Track::Pending { .. } => { cx.notify(); Ok(()) } - ScreenTrack::Published(track) => { + Track::Published(track) => { live_kit.room.unpublish_track(track); cx.notify(); Ok(()) @@ -1023,19 +1175,20 @@ impl Room { struct LiveKitRoom { room: Arc, - screen_track: ScreenTrack, + screen_track: Track, + microphone_track: Track, next_publish_id: usize, _maintain_room: Task<()>, - _maintain_tracks: Task<()>, + _maintain_tracks: [Task<()>; 2], } -enum ScreenTrack { +enum Track { None, Pending { publish_id: usize }, Published(LocalTrackPublication), } -impl Default for ScreenTrack { +impl Default for Track { fn default() -> Self { Self::None } diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index 92b63478cbf1d20bf0597857a96ef88e0590efe2..b2c85f268d577aef4c518be93e902fd654e46b8e 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -257,7 +257,7 @@ async fn test_basic_calls( room_b.read_with(cx_b, |room, _| { assert_eq!( room.remote_participants()[&client_a.user_id().unwrap()] - .tracks + .video_tracks .len(), 1 ); @@ -274,7 +274,7 @@ async fn test_basic_calls( room_c.read_with(cx_c, |room, _| { assert_eq!( room.remote_participants()[&client_a.user_id().unwrap()] - .tracks + .video_tracks .len(), 1 ); @@ -6993,7 +6993,7 @@ async fn test_join_call_after_screen_was_shared( room.remote_participants() .get(&client_a.user_id().unwrap()) .unwrap() - .tracks + .video_tracks .len(), 1 ); diff --git a/crates/collab_ui/src/contact_list.rs b/crates/collab_ui/src/contact_list.rs index e8dae210c4c5ed196207fefb285c21c5a25bd1e1..2dc1fe3f1ba93068853122fb151388b8dc87ccab 100644 --- a/crates/collab_ui/src/contact_list.rs +++ b/crates/collab_ui/src/contact_list.rs @@ -514,10 +514,10 @@ impl ContactList { project_id: project.id, worktree_root_names: project.worktree_root_names.clone(), host_user_id: participant.user.id, - is_last: projects.peek().is_none() && participant.tracks.is_empty(), + is_last: projects.peek().is_none() && participant.video_tracks.is_empty(), }); } - if !participant.tracks.is_empty() { + if !participant.video_tracks.is_empty() { participant_entries.push(ContactEntry::ParticipantScreen { peer_id: participant.peer_id, is_last: true, diff --git a/crates/live_kit_client/src/test.rs b/crates/live_kit_client/src/test.rs index 2f89617363db7a9e9f51251ebf70b5e8b0eea482..232fc7cf4b6f2e8d4f57b1e6bed58bf9f0b0c671 100644 --- a/crates/live_kit_client/src/test.rs +++ b/crates/live_kit_client/src/test.rs @@ -517,6 +517,7 @@ impl RemoteVideoTrack { } } +#[derive(Debug)] pub struct RemoteAudioTrack { sid: Sid, publisher_id: Sid, diff --git a/crates/workspace/src/workspace.rs b/crates/workspace/src/workspace.rs index f813635941d0bea70c6d6b4abb527f8f74b89ef5..85f2580a9af70fd6a381f2aea26aa3abb13bf129 100644 --- a/crates/workspace/src/workspace.rs +++ b/crates/workspace/src/workspace.rs @@ -2750,7 +2750,7 @@ impl Workspace { let call = self.active_call()?; let room = call.read(cx).room()?.read(cx); let participant = room.remote_participant_for_peer_id(peer_id)?; - let track = participant.tracks.values().next()?.clone(); + let track = participant.video_tracks.values().next()?.clone(); let user = participant.user.clone(); for item in pane.read(cx).items_of_type::() {