1use anyhow::{Context as _, Result, anyhow};
2use audio::AudioSettings;
3use collections::HashMap;
4use futures::{SinkExt, channel::mpsc};
5use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
6use gpui_tokio::Tokio;
7use log::info;
8use playback::capture_local_video_track;
9use settings::Settings;
10use std::sync::{Arc, atomic::AtomicU64};
11
12mod playback;
13
14use crate::{
15 ConnectionQuality, LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication,
16 livekit_client::playback::Speaker,
17};
18pub use livekit::SessionStats;
19pub use livekit::webrtc::stats::RtcStats;
20pub use playback::AudioStream;
21pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
22
23#[derive(Clone, Debug)]
24pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
25#[derive(Clone, Debug)]
26pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
27#[derive(Clone, Debug)]
28pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
29#[derive(Clone, Debug)]
30pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
31
32#[derive(Clone, Debug)]
33pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
34#[derive(Clone, Debug)]
35pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
36#[derive(Clone, Debug)]
37pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
38#[derive(Clone, Debug)]
39pub struct LocalParticipant(livekit::participant::LocalParticipant);
40
41pub struct Room {
42 room: livekit::Room,
43 _task: Task<()>,
44 playback: playback::AudioStack,
45}
46
47pub type TrackSid = livekit::id::TrackSid;
48pub type ConnectionState = livekit::ConnectionState;
49#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
50pub struct ParticipantIdentity(pub String);
51
52impl Room {
53 pub async fn connect(
54 url: String,
55 token: String,
56 cx: &mut AsyncApp,
57 ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
58 let mut config = livekit::RoomOptions::default();
59 config.tls_config = livekit::TlsConfig(Some(http_client_tls::tls_config()));
60 let (room, mut events) = Tokio::spawn(cx, async move {
61 livekit::Room::connect(&url, &token, config).await
62 })
63 .await??;
64
65 let (mut tx, rx) = mpsc::unbounded();
66 let task = cx.background_executor().spawn(async move {
67 while let Some(event) = events.recv().await {
68 if let Some(event) = room_event_from_livekit(event) {
69 tx.send(event).await.ok();
70 }
71 }
72 });
73
74 Ok((
75 Self {
76 room,
77 _task: task,
78 playback: playback::AudioStack::new(cx.background_executor().clone()),
79 },
80 rx,
81 ))
82 }
83
84 pub fn local_participant(&self) -> LocalParticipant {
85 LocalParticipant(self.room.local_participant())
86 }
87
88 pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
89 self.room
90 .remote_participants()
91 .into_iter()
92 .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
93 .collect()
94 }
95
96 pub fn connection_state(&self) -> ConnectionState {
97 self.room.connection_state()
98 }
99
100 pub fn name(&self) -> String {
101 self.room.name()
102 }
103
104 pub async fn sid(&self) -> String {
105 self.room.sid().await.to_string()
106 }
107
108 pub async fn publish_local_microphone_track(
109 &self,
110 user_name: String,
111 is_staff: bool,
112 cx: &mut AsyncApp,
113 ) -> Result<(LocalTrackPublication, playback::AudioStream, Arc<AtomicU64>)> {
114 let (track, stream, input_lag_us) = self
115 .playback
116 .capture_local_microphone_track(user_name, is_staff, &cx)?;
117 let publication = self
118 .local_participant()
119 .publish_track(
120 livekit::track::LocalTrack::Audio(track.0),
121 livekit::options::TrackPublishOptions {
122 source: livekit::track::TrackSource::Microphone,
123 ..Default::default()
124 },
125 cx,
126 )
127 .await?;
128
129 Ok((publication, stream, input_lag_us))
130 }
131
132 pub async fn unpublish_local_track(
133 &self,
134 sid: TrackSid,
135 cx: &mut AsyncApp,
136 ) -> Result<LocalTrackPublication> {
137 self.local_participant().unpublish_track(sid, cx).await
138 }
139
140 pub fn play_remote_audio_track(
141 &self,
142 track: &RemoteAudioTrack,
143 cx: &mut App,
144 ) -> Result<playback::AudioStream> {
145 let speaker: Speaker =
146 serde_urlencoded::from_str(&track.0.name()).unwrap_or_else(|_| Speaker {
147 name: track.0.name(),
148 is_staff: false,
149 sends_legacy_audio: true,
150 });
151
152 if AudioSettings::get_global(cx).rodio_audio {
153 info!("Using experimental.rodio_audio audio pipeline for output");
154 playback::play_remote_audio_track(&track.0, speaker, cx)
155 } else if speaker.sends_legacy_audio {
156 let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
157 Ok(self
158 .playback
159 .play_remote_audio_track(&track.0, output_audio_device))
160 } else {
161 Err(anyhow!("Client version too old to play audio in call"))
162 }
163 }
164
165 pub async fn get_stats(&self) -> Result<livekit::SessionStats> {
166 self.room.get_stats().await.map_err(anyhow::Error::from)
167 }
168
169 /// Returns a `Task` that fetches room stats on the Tokio runtime.
170 ///
171 /// LiveKit's SDK is Tokio-based, so the stats fetch must run within
172 /// a Tokio context rather than on GPUI's smol-based background executor.
173 pub fn stats_task(&self, cx: &impl gpui::AppContext) -> Task<Result<livekit::SessionStats>> {
174 let inner = self.room.clone();
175 Tokio::spawn_result(cx, async move {
176 inner.get_stats().await.map_err(anyhow::Error::from)
177 })
178 }
179}
180
181impl LocalParticipant {
182 pub fn connection_quality(&self) -> ConnectionQuality {
183 connection_quality_from_livekit(self.0.connection_quality())
184 }
185
186 pub fn audio_level(&self) -> f32 {
187 self.0.audio_level()
188 }
189
190 pub async fn publish_screenshare_track(
191 &self,
192 source: &dyn ScreenCaptureSource,
193 cx: &mut AsyncApp,
194 ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
195 let (track, stream) = capture_local_video_track(source, cx).await?;
196 let options = livekit::options::TrackPublishOptions {
197 source: livekit::track::TrackSource::Screenshare,
198 video_codec: livekit::options::VideoCodec::VP8,
199 ..Default::default()
200 };
201 let publication = self
202 .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
203 .await?;
204
205 Ok((publication, stream))
206 }
207
208 async fn publish_track(
209 &self,
210 track: livekit::track::LocalTrack,
211 options: livekit::options::TrackPublishOptions,
212 cx: &mut AsyncApp,
213 ) -> Result<LocalTrackPublication> {
214 let participant = self.0.clone();
215 Tokio::spawn(cx, async move {
216 participant.publish_track(track, options).await
217 })
218 .await?
219 .map(LocalTrackPublication)
220 .context("publishing a track")
221 }
222
223 pub async fn unpublish_track(
224 &self,
225 sid: TrackSid,
226 cx: &mut AsyncApp,
227 ) -> Result<LocalTrackPublication> {
228 let participant = self.0.clone();
229 Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })
230 .await?
231 .map(LocalTrackPublication)
232 .context("unpublishing a track")
233 }
234}
235
236impl LocalTrackPublication {
237 pub fn mute(&self, cx: &App) {
238 let track = self.0.clone();
239 Tokio::spawn(cx, async move {
240 track.mute();
241 })
242 .detach();
243 }
244
245 pub fn unmute(&self, cx: &App) {
246 let track = self.0.clone();
247 Tokio::spawn(cx, async move {
248 track.unmute();
249 })
250 .detach();
251 }
252
253 pub fn sid(&self) -> TrackSid {
254 self.0.sid()
255 }
256
257 pub fn is_muted(&self) -> bool {
258 self.0.is_muted()
259 }
260}
261
262impl RemoteParticipant {
263 pub fn connection_quality(&self) -> ConnectionQuality {
264 connection_quality_from_livekit(self.0.connection_quality())
265 }
266
267 pub fn audio_level(&self) -> f32 {
268 self.0.audio_level()
269 }
270
271 pub fn identity(&self) -> ParticipantIdentity {
272 ParticipantIdentity(self.0.identity().0)
273 }
274
275 pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
276 self.0
277 .track_publications()
278 .into_iter()
279 .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
280 .collect()
281 }
282}
283
284impl RemoteAudioTrack {
285 pub fn sid(&self) -> TrackSid {
286 self.0.sid()
287 }
288}
289
290impl RemoteVideoTrack {
291 pub fn sid(&self) -> TrackSid {
292 self.0.sid()
293 }
294}
295
296impl RemoteTrackPublication {
297 pub fn is_muted(&self) -> bool {
298 self.0.is_muted()
299 }
300
301 pub fn is_enabled(&self) -> bool {
302 self.0.is_enabled()
303 }
304
305 pub fn track(&self) -> Option<RemoteTrack> {
306 self.0.track().map(remote_track_from_livekit)
307 }
308
309 pub fn is_audio(&self) -> bool {
310 self.0.kind() == livekit::track::TrackKind::Audio
311 }
312
313 pub fn set_enabled(&self, enabled: bool, cx: &App) {
314 let track = self.0.clone();
315 Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
316 }
317
318 pub fn sid(&self) -> TrackSid {
319 self.0.sid()
320 }
321}
322
323impl Participant {
324 pub fn identity(&self) -> ParticipantIdentity {
325 match self {
326 Participant::Local(local_participant) => {
327 ParticipantIdentity(local_participant.0.identity().0)
328 }
329 Participant::Remote(remote_participant) => {
330 ParticipantIdentity(remote_participant.0.identity().0)
331 }
332 }
333 }
334
335 pub fn connection_quality(&self) -> ConnectionQuality {
336 match self {
337 Participant::Local(local_participant) => local_participant.connection_quality(),
338 Participant::Remote(remote_participant) => remote_participant.connection_quality(),
339 }
340 }
341
342 pub fn audio_level(&self) -> f32 {
343 match self {
344 Participant::Local(local_participant) => local_participant.audio_level(),
345 Participant::Remote(remote_participant) => remote_participant.audio_level(),
346 }
347 }
348}
349
350fn connection_quality_from_livekit(
351 quality: livekit::prelude::ConnectionQuality,
352) -> ConnectionQuality {
353 match quality {
354 livekit::prelude::ConnectionQuality::Excellent => ConnectionQuality::Excellent,
355 livekit::prelude::ConnectionQuality::Good => ConnectionQuality::Good,
356 livekit::prelude::ConnectionQuality::Poor => ConnectionQuality::Poor,
357 livekit::prelude::ConnectionQuality::Lost => ConnectionQuality::Lost,
358 }
359}
360
361fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
362 match participant {
363 livekit::participant::Participant::Local(local) => {
364 Participant::Local(LocalParticipant(local))
365 }
366 livekit::participant::Participant::Remote(remote) => {
367 Participant::Remote(RemoteParticipant(remote))
368 }
369 }
370}
371
372fn publication_from_livekit(
373 publication: livekit::publication::TrackPublication,
374) -> TrackPublication {
375 match publication {
376 livekit::publication::TrackPublication::Local(local) => {
377 TrackPublication::Local(LocalTrackPublication(local))
378 }
379 livekit::publication::TrackPublication::Remote(remote) => {
380 TrackPublication::Remote(RemoteTrackPublication(remote))
381 }
382 }
383}
384
385fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
386 match track {
387 livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
388 livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
389 }
390}
391
392fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
393 match track {
394 livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
395 livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
396 }
397}
398fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
399 let event = match event {
400 livekit::RoomEvent::ParticipantConnected(remote_participant) => {
401 RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
402 }
403 livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
404 RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
405 }
406 livekit::RoomEvent::LocalTrackPublished {
407 publication,
408 track,
409 participant,
410 } => RoomEvent::LocalTrackPublished {
411 publication: LocalTrackPublication(publication),
412 track: local_track_from_livekit(track),
413 participant: LocalParticipant(participant),
414 },
415 livekit::RoomEvent::LocalTrackUnpublished {
416 publication,
417 participant,
418 } => RoomEvent::LocalTrackUnpublished {
419 publication: LocalTrackPublication(publication),
420 participant: LocalParticipant(participant),
421 },
422 livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
423 track: local_track_from_livekit(track),
424 },
425 livekit::RoomEvent::TrackSubscribed {
426 track,
427 publication,
428 participant,
429 } => RoomEvent::TrackSubscribed {
430 track: remote_track_from_livekit(track),
431 publication: RemoteTrackPublication(publication),
432 participant: RemoteParticipant(participant),
433 },
434 livekit::RoomEvent::TrackUnsubscribed {
435 track,
436 publication,
437 participant,
438 } => RoomEvent::TrackUnsubscribed {
439 track: remote_track_from_livekit(track),
440 publication: RemoteTrackPublication(publication),
441 participant: RemoteParticipant(participant),
442 },
443 livekit::RoomEvent::TrackSubscriptionFailed {
444 participant,
445 error: _,
446 track_sid,
447 } => RoomEvent::TrackSubscriptionFailed {
448 participant: RemoteParticipant(participant),
449 track_sid,
450 },
451 livekit::RoomEvent::TrackPublished {
452 publication,
453 participant,
454 } => RoomEvent::TrackPublished {
455 publication: RemoteTrackPublication(publication),
456 participant: RemoteParticipant(participant),
457 },
458 livekit::RoomEvent::TrackUnpublished {
459 publication,
460 participant,
461 } => RoomEvent::TrackUnpublished {
462 publication: RemoteTrackPublication(publication),
463 participant: RemoteParticipant(participant),
464 },
465 livekit::RoomEvent::TrackMuted {
466 participant,
467 publication,
468 } => RoomEvent::TrackMuted {
469 publication: publication_from_livekit(publication),
470 participant: participant_from_livekit(participant),
471 },
472 livekit::RoomEvent::TrackUnmuted {
473 participant,
474 publication,
475 } => RoomEvent::TrackUnmuted {
476 publication: publication_from_livekit(publication),
477 participant: participant_from_livekit(participant),
478 },
479 livekit::RoomEvent::RoomMetadataChanged {
480 old_metadata,
481 metadata,
482 } => RoomEvent::RoomMetadataChanged {
483 old_metadata,
484 metadata,
485 },
486 livekit::RoomEvent::ParticipantMetadataChanged {
487 participant,
488 old_metadata,
489 metadata,
490 } => RoomEvent::ParticipantMetadataChanged {
491 participant: participant_from_livekit(participant),
492 old_metadata,
493 metadata,
494 },
495 livekit::RoomEvent::ParticipantNameChanged {
496 participant,
497 old_name,
498 name,
499 } => RoomEvent::ParticipantNameChanged {
500 participant: participant_from_livekit(participant),
501 old_name,
502 name,
503 },
504 livekit::RoomEvent::ParticipantAttributesChanged {
505 participant,
506 changed_attributes,
507 } => RoomEvent::ParticipantAttributesChanged {
508 participant: participant_from_livekit(participant),
509 changed_attributes: changed_attributes.into_iter().collect(),
510 },
511 livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
512 RoomEvent::ActiveSpeakersChanged {
513 speakers: speakers.into_iter().map(participant_from_livekit).collect(),
514 }
515 }
516 livekit::RoomEvent::Connected {
517 participants_with_tracks,
518 } => RoomEvent::Connected {
519 participants_with_tracks: participants_with_tracks
520 .into_iter()
521 .map({
522 |(p, t)| {
523 (
524 RemoteParticipant(p),
525 t.into_iter().map(RemoteTrackPublication).collect(),
526 )
527 }
528 })
529 .collect(),
530 },
531 livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
532 reason: reason.as_str_name(),
533 },
534 livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
535 livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
536 livekit::RoomEvent::ConnectionQualityChanged {
537 quality,
538 participant,
539 } => RoomEvent::ConnectionQualityChanged {
540 participant: participant_from_livekit(participant),
541 quality: connection_quality_from_livekit(quality),
542 },
543 _ => {
544 log::trace!("dropping livekit event: {:?}", event);
545 return None;
546 }
547 };
548
549 Some(event)
550}