@@ -12,7 +12,9 @@ use fs::Fs;
use futures::{FutureExt, StreamExt};
use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
use language::LanguageRegistry;
-use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
+use live_kit_client::{
+ LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate, RemoteAudioTrackUpdate,
+};
use postage::stream::Stream;
use project::Project;
use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
@@ -28,6 +30,9 @@ pub enum Event {
RemoteVideoTracksChanged {
participant_id: proto::PeerId,
},
+ RemoteAudioTracksChanged {
+ participant_id: proto::PeerId,
+ },
RemoteProjectShared {
owner: Arc<User>,
project_id: u64,
@@ -112,9 +117,9 @@ impl Room {
}
});
- let mut track_changes = room.remote_video_track_updates();
- let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
- while let Some(track_change) = track_changes.next().await {
+ let mut track_video_changes = room.remote_video_track_updates();
+ let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
+ while let Some(track_change) = track_video_changes.next().await {
let this = if let Some(this) = this.upgrade(&cx) {
this
} else {
@@ -127,16 +132,32 @@ impl Room {
}
});
+ let mut track_audio_changes = room.remote_audio_track_updates();
+ let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
+ while let Some(track_change) = track_audio_changes.next().await {
+ let this = if let Some(this) = this.upgrade(&cx) {
+ this
+ } else {
+ break;
+ };
+
+ this.update(&mut cx, |this, cx| {
+ this.remote_audio_track_updated(track_change, cx).log_err()
+ });
+ }
+ });
+
cx.foreground()
.spawn(room.connect(&connection_info.server_url, &connection_info.token))
.detach_and_log_err(cx);
Some(LiveKitRoom {
room,
- screen_track: ScreenTrack::None,
+ screen_track: Track::None,
+ microphone_track: Track::None,
next_publish_id: 0,
_maintain_room,
- _maintain_tracks,
+ _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
})
} else {
None
@@ -197,6 +218,12 @@ impl Room {
None
};
+ let share_mic = room.update(&mut cx, |room, cx| {
+ room.share_mic(cx)
+ });
+
+ cx.background().spawn(share_mic).detach();
+
match room
.update(&mut cx, |room, cx| {
room.leave_when_empty = true;
@@ -618,20 +645,26 @@ impl Room {
peer_id,
projects: participant.projects,
location,
- tracks: Default::default(),
+ video_tracks: Default::default(),
+ audio_tracks: Default::default(),
},
);
if let Some(live_kit) = this.live_kit.as_ref() {
- let tracks =
+ let video_tracks =
live_kit.room.remote_video_tracks(&user.id.to_string());
- for track in tracks {
+ let audio_tracks = live_kit.room.remote_audio_tracks(&user.id.to_string());
+ for track in video_tracks {
this.remote_video_track_updated(
RemoteVideoTrackUpdate::Subscribed(track),
cx,
)
.log_err();
}
+ for track in audio_tracks {
+ this.remote_audio_track_updated(RemoteAudioTrackUpdate::Subscribed(track), cx)
+ .log_err();
+ }
}
}
}
@@ -706,7 +739,7 @@ impl Room {
.remote_participants
.get_mut(&user_id)
.ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
- participant.tracks.insert(
+ participant.video_tracks.insert(
track_id.clone(),
Arc::new(RemoteVideoTrack {
live_kit_track: track,
@@ -725,7 +758,7 @@ impl Room {
.remote_participants
.get_mut(&user_id)
.ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
- participant.tracks.remove(&track_id);
+ participant.video_tracks.remove(&track_id);
cx.emit(Event::RemoteVideoTracksChanged {
participant_id: participant.peer_id,
});
@@ -736,6 +769,47 @@ impl Room {
Ok(())
}
+ fn remote_audio_track_updated(
+ &mut self,
+ change: RemoteAudioTrackUpdate,
+ cx: &mut ModelContext<Self>,
+ ) -> Result<()> {
+ match change {
+ RemoteAudioTrackUpdate::Subscribed(track) => {
+ let user_id = track.publisher_id().parse()?;
+ let track_id = track.sid().to_string();
+ let participant = self
+ .remote_participants
+ .get_mut(&user_id)
+ .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
+ participant.audio_tracks.insert(
+ track_id.clone(),
+ track,
+ );
+ cx.emit(Event::RemoteAudioTracksChanged {
+ participant_id: participant.peer_id,
+ });
+ }
+ RemoteAudioTrackUpdate::Unsubscribed {
+ publisher_id,
+ track_id,
+ } => {
+ let user_id = publisher_id.parse()?;
+ let participant = self
+ .remote_participants
+ .get_mut(&user_id)
+ .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
+ participant.audio_tracks.remove(&track_id);
+ cx.emit(Event::RemoteAudioTracksChanged {
+ participant_id: participant.peer_id,
+ });
+ }
+ }
+
+ cx.notify();
+ Ok(())
+ }
+
fn check_invariants(&self) {
#[cfg(any(test, feature = "test-support"))]
{
@@ -908,7 +982,85 @@ impl Room {
pub fn is_screen_sharing(&self) -> bool {
self.live_kit.as_ref().map_or(false, |live_kit| {
- !matches!(live_kit.screen_track, ScreenTrack::None)
+ !matches!(live_kit.screen_track, Track::None)
+ })
+ }
+
+ pub fn is_sharing_mic(&self) -> bool {
+ self.live_kit.as_ref().map_or(false, |live_kit| {
+ !matches!(live_kit.microphone_track, Track::None)
+ })
+ }
+
+ pub fn share_mic(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
+ if self.status.is_offline() {
+ return Task::ready(Err(anyhow!("room is offline")));
+ } else if self.is_sharing_mic() {
+ return Task::ready(Err(anyhow!("microphone was already shared")));
+ }
+
+ let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
+ let publish_id = post_inc(&mut live_kit.next_publish_id);
+ live_kit.microphone_track = Track::Pending { publish_id };
+ cx.notify();
+ publish_id
+ } else {
+ return Task::ready(Err(anyhow!("live-kit was not initialized")));
+ };
+
+ cx.spawn_weak(|this, mut cx| async move {
+ let publish_track = async {
+ let track = LocalAudioTrack::create();
+ this.upgrade(&cx)
+ .ok_or_else(|| anyhow!("room was dropped"))?
+ .read_with(&cx, |this, _| {
+ this.live_kit
+ .as_ref()
+ .map(|live_kit| live_kit.room.publish_audio_track(&track))
+ })
+ .ok_or_else(|| anyhow!("live-kit was not initialized"))?
+ .await
+ };
+
+ let publication = publish_track.await;
+ this.upgrade(&cx)
+ .ok_or_else(|| anyhow!("room was dropped"))?
+ .update(&mut cx, |this, cx| {
+ let live_kit = this
+ .live_kit
+ .as_mut()
+ .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
+
+ let canceled = if let Track::Pending {
+ publish_id: cur_publish_id,
+ } = &live_kit.microphone_track
+ {
+ *cur_publish_id != publish_id
+ } else {
+ true
+ };
+
+ match publication {
+ Ok(publication) => {
+ if canceled {
+ live_kit.room.unpublish_track(publication);
+ } else {
+ live_kit.microphone_track = Track::Published(publication);
+ cx.notify();
+ }
+ Ok(())
+ }
+ Err(error) => {
+ if canceled {
+ Ok(())
+ } else {
+ live_kit.microphone_track = Track::None;
+ cx.notify();
+ Err(error)
+ }
+ }
+ }
+ })
})
}
@@ -921,7 +1073,7 @@ impl Room {
let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
let publish_id = post_inc(&mut live_kit.next_publish_id);
- live_kit.screen_track = ScreenTrack::Pending { publish_id };
+ live_kit.screen_track = Track::Pending { publish_id };
cx.notify();
(live_kit.room.display_sources(), publish_id)
} else {
@@ -955,7 +1107,7 @@ impl Room {
.as_mut()
.ok_or_else(|| anyhow!("live-kit was not initialized"))?;
- let canceled = if let ScreenTrack::Pending {
+ let canceled = if let Track::Pending {
publish_id: cur_publish_id,
} = &live_kit.screen_track
{
@@ -969,7 +1121,7 @@ impl Room {
if canceled {
live_kit.room.unpublish_track(publication);
} else {
- live_kit.screen_track = ScreenTrack::Published(publication);
+ live_kit.screen_track = Track::Published(publication);
cx.notify();
}
Ok(())
@@ -978,7 +1130,7 @@ impl Room {
if canceled {
Ok(())
} else {
- live_kit.screen_track = ScreenTrack::None;
+ live_kit.screen_track = Track::None;
cx.notify();
Err(error)
}
@@ -998,12 +1150,12 @@ impl Room {
.as_mut()
.ok_or_else(|| anyhow!("live-kit was not initialized"))?;
match mem::take(&mut live_kit.screen_track) {
- ScreenTrack::None => Err(anyhow!("screen was not shared")),
- ScreenTrack::Pending { .. } => {
+ Track::None => Err(anyhow!("screen was not shared")),
+ Track::Pending { .. } => {
cx.notify();
Ok(())
}
- ScreenTrack::Published(track) => {
+ Track::Published(track) => {
live_kit.room.unpublish_track(track);
cx.notify();
Ok(())
@@ -1023,19 +1175,20 @@ impl Room {
struct LiveKitRoom {
room: Arc<live_kit_client::Room>,
- screen_track: ScreenTrack,
+ screen_track: Track,
+ microphone_track: Track,
next_publish_id: usize,
_maintain_room: Task<()>,
- _maintain_tracks: Task<()>,
+ _maintain_tracks: [Task<()>; 2],
}
-enum ScreenTrack {
+enum Track {
None,
Pending { publish_id: usize },
Published(LocalTrackPublication),
}
-impl Default for ScreenTrack {
+impl Default for Track {
fn default() -> Self {
Self::None
}