livekit_client.rs

  1use anyhow::{Context as _, Result, anyhow};
  2use audio::AudioSettings;
  3use collections::HashMap;
  4use futures::{SinkExt, channel::mpsc};
  5use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
  6use gpui_tokio::Tokio;
  7use log::info;
  8use playback::capture_local_video_track;
  9use settings::Settings;
 10use std::sync::{Arc, atomic::AtomicU64};
 11
 12mod playback;
 13
 14use crate::{
 15    ConnectionQuality, LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication,
 16    livekit_client::playback::Speaker,
 17};
 18pub use livekit::SessionStats;
 19pub use livekit::webrtc::stats::RtcStats;
 20pub use playback::AudioStream;
 21pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
 22
 23#[derive(Clone, Debug)]
 24pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
 25#[derive(Clone, Debug)]
 26pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
 27#[derive(Clone, Debug)]
 28pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
 29#[derive(Clone, Debug)]
 30pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
 31
 32#[derive(Clone, Debug)]
 33pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
 34#[derive(Clone, Debug)]
 35pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
 36#[derive(Clone, Debug)]
 37pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
 38#[derive(Clone, Debug)]
 39pub struct LocalParticipant(livekit::participant::LocalParticipant);
 40
 41pub struct Room {
 42    room: livekit::Room,
 43    _task: Task<()>,
 44    playback: playback::AudioStack,
 45}
 46
 47pub type TrackSid = livekit::id::TrackSid;
 48pub type ConnectionState = livekit::ConnectionState;
 49#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
 50pub struct ParticipantIdentity(pub String);
 51
 52impl Room {
 53    pub async fn connect(
 54        url: String,
 55        token: String,
 56        cx: &mut AsyncApp,
 57    ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
 58        let mut config = livekit::RoomOptions::default();
 59        config.tls_config = livekit::TlsConfig(Some(http_client_tls::tls_config()));
 60        let (room, mut events) = Tokio::spawn(cx, async move {
 61            livekit::Room::connect(&url, &token, config).await
 62        })
 63        .await??;
 64
 65        let (mut tx, rx) = mpsc::unbounded();
 66        let task = cx.background_executor().spawn(async move {
 67            while let Some(event) = events.recv().await {
 68                if let Some(event) = room_event_from_livekit(event) {
 69                    tx.send(event).await.ok();
 70                }
 71            }
 72        });
 73
 74        Ok((
 75            Self {
 76                room,
 77                _task: task,
 78                playback: playback::AudioStack::new(cx.background_executor().clone()),
 79            },
 80            rx,
 81        ))
 82    }
 83
 84    pub fn local_participant(&self) -> LocalParticipant {
 85        LocalParticipant(self.room.local_participant())
 86    }
 87
 88    pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
 89        self.room
 90            .remote_participants()
 91            .into_iter()
 92            .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
 93            .collect()
 94    }
 95
 96    pub fn connection_state(&self) -> ConnectionState {
 97        self.room.connection_state()
 98    }
 99
100    pub fn name(&self) -> String {
101        self.room.name()
102    }
103
104    pub async fn sid(&self) -> String {
105        self.room.sid().await.to_string()
106    }
107
108    pub async fn publish_local_microphone_track(
109        &self,
110        user_name: String,
111        is_staff: bool,
112        cx: &mut AsyncApp,
113    ) -> Result<(LocalTrackPublication, playback::AudioStream, Arc<AtomicU64>)> {
114        let (track, stream, input_lag_us) = self
115            .playback
116            .capture_local_microphone_track(user_name, is_staff, &cx)?;
117        let publication = self
118            .local_participant()
119            .publish_track(
120                livekit::track::LocalTrack::Audio(track.0),
121                livekit::options::TrackPublishOptions {
122                    source: livekit::track::TrackSource::Microphone,
123                    ..Default::default()
124                },
125                cx,
126            )
127            .await?;
128
129        Ok((publication, stream, input_lag_us))
130    }
131
132    pub async fn unpublish_local_track(
133        &self,
134        sid: TrackSid,
135        cx: &mut AsyncApp,
136    ) -> Result<LocalTrackPublication> {
137        self.local_participant().unpublish_track(sid, cx).await
138    }
139
140    pub fn play_remote_audio_track(
141        &self,
142        track: &RemoteAudioTrack,
143        cx: &mut App,
144    ) -> Result<playback::AudioStream> {
145        let speaker: Speaker =
146            serde_urlencoded::from_str(&track.0.name()).unwrap_or_else(|_| Speaker {
147                name: track.0.name(),
148                is_staff: false,
149                sends_legacy_audio: true,
150            });
151
152        if AudioSettings::get_global(cx).rodio_audio {
153            info!("Using experimental.rodio_audio audio pipeline for output");
154            playback::play_remote_audio_track(&track.0, speaker, cx)
155        } else if speaker.sends_legacy_audio {
156            let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
157            Ok(self
158                .playback
159                .play_remote_audio_track(&track.0, output_audio_device))
160        } else {
161            Err(anyhow!("Client version too old to play audio in call"))
162        }
163    }
164
165    pub async fn get_stats(&self) -> Result<livekit::SessionStats> {
166        self.room.get_stats().await.map_err(anyhow::Error::from)
167    }
168
169    /// Returns a `Task` that fetches room stats on the Tokio runtime.
170    ///
171    /// LiveKit's SDK is Tokio-based, so the stats fetch must run within
172    /// a Tokio context rather than on GPUI's smol-based background executor.
173    pub fn stats_task(&self, cx: &impl gpui::AppContext) -> Task<Result<livekit::SessionStats>> {
174        let inner = self.room.clone();
175        Tokio::spawn_result(cx, async move {
176            inner.get_stats().await.map_err(anyhow::Error::from)
177        })
178    }
179}
180
181impl LocalParticipant {
182    pub fn connection_quality(&self) -> ConnectionQuality {
183        connection_quality_from_livekit(self.0.connection_quality())
184    }
185
186    pub fn audio_level(&self) -> f32 {
187        self.0.audio_level()
188    }
189
190    pub async fn publish_screenshare_track(
191        &self,
192        source: &dyn ScreenCaptureSource,
193        cx: &mut AsyncApp,
194    ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
195        let (track, stream) = capture_local_video_track(source, cx).await?;
196        let options = livekit::options::TrackPublishOptions {
197            source: livekit::track::TrackSource::Screenshare,
198            video_codec: livekit::options::VideoCodec::VP8,
199            ..Default::default()
200        };
201        let publication = self
202            .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
203            .await?;
204
205        Ok((publication, stream))
206    }
207
208    async fn publish_track(
209        &self,
210        track: livekit::track::LocalTrack,
211        options: livekit::options::TrackPublishOptions,
212        cx: &mut AsyncApp,
213    ) -> Result<LocalTrackPublication> {
214        let participant = self.0.clone();
215        Tokio::spawn(cx, async move {
216            participant.publish_track(track, options).await
217        })
218        .await?
219        .map(LocalTrackPublication)
220        .context("publishing a track")
221    }
222
223    pub async fn unpublish_track(
224        &self,
225        sid: TrackSid,
226        cx: &mut AsyncApp,
227    ) -> Result<LocalTrackPublication> {
228        let participant = self.0.clone();
229        Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })
230            .await?
231            .map(LocalTrackPublication)
232            .context("unpublishing a track")
233    }
234}
235
236impl LocalTrackPublication {
237    pub fn mute(&self, cx: &App) {
238        let track = self.0.clone();
239        Tokio::spawn(cx, async move {
240            track.mute();
241        })
242        .detach();
243    }
244
245    pub fn unmute(&self, cx: &App) {
246        let track = self.0.clone();
247        Tokio::spawn(cx, async move {
248            track.unmute();
249        })
250        .detach();
251    }
252
253    pub fn sid(&self) -> TrackSid {
254        self.0.sid()
255    }
256
257    pub fn is_muted(&self) -> bool {
258        self.0.is_muted()
259    }
260}
261
262impl RemoteParticipant {
263    pub fn connection_quality(&self) -> ConnectionQuality {
264        connection_quality_from_livekit(self.0.connection_quality())
265    }
266
267    pub fn audio_level(&self) -> f32 {
268        self.0.audio_level()
269    }
270
271    pub fn identity(&self) -> ParticipantIdentity {
272        ParticipantIdentity(self.0.identity().0)
273    }
274
275    pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
276        self.0
277            .track_publications()
278            .into_iter()
279            .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
280            .collect()
281    }
282}
283
284impl RemoteAudioTrack {
285    pub fn sid(&self) -> TrackSid {
286        self.0.sid()
287    }
288}
289
290impl RemoteVideoTrack {
291    pub fn sid(&self) -> TrackSid {
292        self.0.sid()
293    }
294}
295
296impl RemoteTrackPublication {
297    pub fn is_muted(&self) -> bool {
298        self.0.is_muted()
299    }
300
301    pub fn is_enabled(&self) -> bool {
302        self.0.is_enabled()
303    }
304
305    pub fn track(&self) -> Option<RemoteTrack> {
306        self.0.track().map(remote_track_from_livekit)
307    }
308
309    pub fn is_audio(&self) -> bool {
310        self.0.kind() == livekit::track::TrackKind::Audio
311    }
312
313    pub fn set_enabled(&self, enabled: bool, cx: &App) {
314        let track = self.0.clone();
315        Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
316    }
317
318    pub fn sid(&self) -> TrackSid {
319        self.0.sid()
320    }
321}
322
323impl Participant {
324    pub fn identity(&self) -> ParticipantIdentity {
325        match self {
326            Participant::Local(local_participant) => {
327                ParticipantIdentity(local_participant.0.identity().0)
328            }
329            Participant::Remote(remote_participant) => {
330                ParticipantIdentity(remote_participant.0.identity().0)
331            }
332        }
333    }
334
335    pub fn connection_quality(&self) -> ConnectionQuality {
336        match self {
337            Participant::Local(local_participant) => local_participant.connection_quality(),
338            Participant::Remote(remote_participant) => remote_participant.connection_quality(),
339        }
340    }
341
342    pub fn audio_level(&self) -> f32 {
343        match self {
344            Participant::Local(local_participant) => local_participant.audio_level(),
345            Participant::Remote(remote_participant) => remote_participant.audio_level(),
346        }
347    }
348}
349
350fn connection_quality_from_livekit(
351    quality: livekit::prelude::ConnectionQuality,
352) -> ConnectionQuality {
353    match quality {
354        livekit::prelude::ConnectionQuality::Excellent => ConnectionQuality::Excellent,
355        livekit::prelude::ConnectionQuality::Good => ConnectionQuality::Good,
356        livekit::prelude::ConnectionQuality::Poor => ConnectionQuality::Poor,
357        livekit::prelude::ConnectionQuality::Lost => ConnectionQuality::Lost,
358    }
359}
360
361fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
362    match participant {
363        livekit::participant::Participant::Local(local) => {
364            Participant::Local(LocalParticipant(local))
365        }
366        livekit::participant::Participant::Remote(remote) => {
367            Participant::Remote(RemoteParticipant(remote))
368        }
369    }
370}
371
372fn publication_from_livekit(
373    publication: livekit::publication::TrackPublication,
374) -> TrackPublication {
375    match publication {
376        livekit::publication::TrackPublication::Local(local) => {
377            TrackPublication::Local(LocalTrackPublication(local))
378        }
379        livekit::publication::TrackPublication::Remote(remote) => {
380            TrackPublication::Remote(RemoteTrackPublication(remote))
381        }
382    }
383}
384
385fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
386    match track {
387        livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
388        livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
389    }
390}
391
392fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
393    match track {
394        livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
395        livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
396    }
397}
398fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
399    let event = match event {
400        livekit::RoomEvent::ParticipantConnected(remote_participant) => {
401            RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
402        }
403        livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
404            RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
405        }
406        livekit::RoomEvent::LocalTrackPublished {
407            publication,
408            track,
409            participant,
410        } => RoomEvent::LocalTrackPublished {
411            publication: LocalTrackPublication(publication),
412            track: local_track_from_livekit(track),
413            participant: LocalParticipant(participant),
414        },
415        livekit::RoomEvent::LocalTrackUnpublished {
416            publication,
417            participant,
418        } => RoomEvent::LocalTrackUnpublished {
419            publication: LocalTrackPublication(publication),
420            participant: LocalParticipant(participant),
421        },
422        livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
423            track: local_track_from_livekit(track),
424        },
425        livekit::RoomEvent::TrackSubscribed {
426            track,
427            publication,
428            participant,
429        } => RoomEvent::TrackSubscribed {
430            track: remote_track_from_livekit(track),
431            publication: RemoteTrackPublication(publication),
432            participant: RemoteParticipant(participant),
433        },
434        livekit::RoomEvent::TrackUnsubscribed {
435            track,
436            publication,
437            participant,
438        } => RoomEvent::TrackUnsubscribed {
439            track: remote_track_from_livekit(track),
440            publication: RemoteTrackPublication(publication),
441            participant: RemoteParticipant(participant),
442        },
443        livekit::RoomEvent::TrackSubscriptionFailed {
444            participant,
445            error: _,
446            track_sid,
447        } => RoomEvent::TrackSubscriptionFailed {
448            participant: RemoteParticipant(participant),
449            track_sid,
450        },
451        livekit::RoomEvent::TrackPublished {
452            publication,
453            participant,
454        } => RoomEvent::TrackPublished {
455            publication: RemoteTrackPublication(publication),
456            participant: RemoteParticipant(participant),
457        },
458        livekit::RoomEvent::TrackUnpublished {
459            publication,
460            participant,
461        } => RoomEvent::TrackUnpublished {
462            publication: RemoteTrackPublication(publication),
463            participant: RemoteParticipant(participant),
464        },
465        livekit::RoomEvent::TrackMuted {
466            participant,
467            publication,
468        } => RoomEvent::TrackMuted {
469            publication: publication_from_livekit(publication),
470            participant: participant_from_livekit(participant),
471        },
472        livekit::RoomEvent::TrackUnmuted {
473            participant,
474            publication,
475        } => RoomEvent::TrackUnmuted {
476            publication: publication_from_livekit(publication),
477            participant: participant_from_livekit(participant),
478        },
479        livekit::RoomEvent::RoomMetadataChanged {
480            old_metadata,
481            metadata,
482        } => RoomEvent::RoomMetadataChanged {
483            old_metadata,
484            metadata,
485        },
486        livekit::RoomEvent::ParticipantMetadataChanged {
487            participant,
488            old_metadata,
489            metadata,
490        } => RoomEvent::ParticipantMetadataChanged {
491            participant: participant_from_livekit(participant),
492            old_metadata,
493            metadata,
494        },
495        livekit::RoomEvent::ParticipantNameChanged {
496            participant,
497            old_name,
498            name,
499        } => RoomEvent::ParticipantNameChanged {
500            participant: participant_from_livekit(participant),
501            old_name,
502            name,
503        },
504        livekit::RoomEvent::ParticipantAttributesChanged {
505            participant,
506            changed_attributes,
507        } => RoomEvent::ParticipantAttributesChanged {
508            participant: participant_from_livekit(participant),
509            changed_attributes: changed_attributes.into_iter().collect(),
510        },
511        livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
512            RoomEvent::ActiveSpeakersChanged {
513                speakers: speakers.into_iter().map(participant_from_livekit).collect(),
514            }
515        }
516        livekit::RoomEvent::Connected {
517            participants_with_tracks,
518        } => RoomEvent::Connected {
519            participants_with_tracks: participants_with_tracks
520                .into_iter()
521                .map({
522                    |(p, t)| {
523                        (
524                            RemoteParticipant(p),
525                            t.into_iter().map(RemoteTrackPublication).collect(),
526                        )
527                    }
528                })
529                .collect(),
530        },
531        livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
532            reason: reason.as_str_name(),
533        },
534        livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
535        livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
536        livekit::RoomEvent::ConnectionQualityChanged {
537            quality,
538            participant,
539        } => RoomEvent::ConnectionQualityChanged {
540            participant: participant_from_livekit(participant),
541            quality: connection_quality_from_livekit(quality),
542        },
543        _ => {
544            log::trace!("dropping livekit event: {:?}", event);
545            return None;
546        }
547    };
548
549    Some(event)
550}