livekit_client.rs

  1use anyhow::{Context as _, Result, anyhow};
  2use audio::AudioSettings;
  3use collections::HashMap;
  4use futures::{SinkExt, channel::mpsc};
  5use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
  6use gpui_tokio::Tokio;
  7use log::info;
  8use playback::capture_local_video_track;
  9use settings::Settings;
 10use std::sync::{Arc, atomic::AtomicU64};
 11
 12#[cfg(target_os = "linux")]
 13mod linux;
 14mod playback;
 15
 16use crate::{
 17    ConnectionQuality, LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication,
 18    livekit_client::playback::Speaker,
 19};
 20pub use livekit::SessionStats;
 21pub use livekit::webrtc::stats::RtcStats;
 22pub use playback::AudioStream;
 23pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
 24
 25#[derive(Clone, Debug)]
 26pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
 27#[derive(Clone, Debug)]
 28pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
 29#[derive(Clone, Debug)]
 30pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
 31#[derive(Clone, Debug)]
 32pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
 33
 34#[derive(Clone, Debug)]
 35pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
 36#[derive(Clone, Debug)]
 37pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
 38#[derive(Clone, Debug)]
 39pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
 40#[derive(Clone, Debug)]
 41pub struct LocalParticipant(livekit::participant::LocalParticipant);
 42
 43pub struct Room {
 44    room: livekit::Room,
 45    _task: Task<()>,
 46    playback: playback::AudioStack,
 47}
 48
 49pub type TrackSid = livekit::id::TrackSid;
 50pub type ConnectionState = livekit::ConnectionState;
 51#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
 52pub struct ParticipantIdentity(pub String);
 53
 54impl Room {
 55    pub async fn connect(
 56        url: String,
 57        token: String,
 58        cx: &mut AsyncApp,
 59    ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
 60        let mut config = livekit::RoomOptions::default();
 61        config.tls_config = livekit::TlsConfig(Some(http_client_tls::tls_config()));
 62        let (room, mut events) = Tokio::spawn(cx, async move {
 63            livekit::Room::connect(&url, &token, config).await
 64        })
 65        .await??;
 66
 67        let (mut tx, rx) = mpsc::unbounded();
 68        let task = cx.background_executor().spawn(async move {
 69            while let Some(event) = events.recv().await {
 70                if let Some(event) = room_event_from_livekit(event) {
 71                    tx.send(event).await.ok();
 72                }
 73            }
 74        });
 75
 76        Ok((
 77            Self {
 78                room,
 79                _task: task,
 80                playback: playback::AudioStack::new(cx.background_executor().clone()),
 81            },
 82            rx,
 83        ))
 84    }
 85
 86    pub fn local_participant(&self) -> LocalParticipant {
 87        LocalParticipant(self.room.local_participant())
 88    }
 89
 90    pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
 91        self.room
 92            .remote_participants()
 93            .into_iter()
 94            .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
 95            .collect()
 96    }
 97
 98    pub fn connection_state(&self) -> ConnectionState {
 99        self.room.connection_state()
100    }
101
102    pub fn name(&self) -> String {
103        self.room.name()
104    }
105
106    pub async fn sid(&self) -> String {
107        self.room.sid().await.to_string()
108    }
109
110    pub async fn publish_local_microphone_track(
111        &self,
112        user_name: String,
113        is_staff: bool,
114        cx: &mut AsyncApp,
115    ) -> Result<(LocalTrackPublication, playback::AudioStream, Arc<AtomicU64>)> {
116        let (track, stream, input_lag_us) = self
117            .playback
118            .capture_local_microphone_track(user_name, is_staff, &cx)?;
119        let publication = self
120            .local_participant()
121            .publish_track(
122                livekit::track::LocalTrack::Audio(track.0),
123                livekit::options::TrackPublishOptions {
124                    source: livekit::track::TrackSource::Microphone,
125                    ..Default::default()
126                },
127                cx,
128            )
129            .await?;
130
131        Ok((publication, stream, input_lag_us))
132    }
133
134    pub async fn unpublish_local_track(
135        &self,
136        sid: TrackSid,
137        cx: &mut AsyncApp,
138    ) -> Result<LocalTrackPublication> {
139        self.local_participant().unpublish_track(sid, cx).await
140    }
141
142    pub fn play_remote_audio_track(
143        &self,
144        track: &RemoteAudioTrack,
145        cx: &mut App,
146    ) -> Result<playback::AudioStream> {
147        let speaker: Speaker =
148            serde_urlencoded::from_str(&track.0.name()).unwrap_or_else(|_| Speaker {
149                name: track.0.name(),
150                is_staff: false,
151                sends_legacy_audio: true,
152            });
153
154        if AudioSettings::get_global(cx).rodio_audio {
155            info!("Using experimental.rodio_audio audio pipeline for output");
156            playback::play_remote_audio_track(&track.0, speaker, cx)
157        } else if speaker.sends_legacy_audio {
158            let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
159            Ok(self
160                .playback
161                .play_remote_audio_track(&track.0, output_audio_device))
162        } else {
163            Err(anyhow!("Client version too old to play audio in call"))
164        }
165    }
166
167    pub async fn get_stats(&self) -> Result<livekit::SessionStats> {
168        self.room.get_stats().await.map_err(anyhow::Error::from)
169    }
170
171    /// Returns a `Task` that fetches room stats on the Tokio runtime.
172    ///
173    /// LiveKit's SDK is Tokio-based, so the stats fetch must run within
174    /// a Tokio context rather than on GPUI's smol-based background executor.
175    pub fn stats_task(&self, cx: &impl gpui::AppContext) -> Task<Result<livekit::SessionStats>> {
176        let inner = self.room.clone();
177        Tokio::spawn_result(cx, async move {
178            inner.get_stats().await.map_err(anyhow::Error::from)
179        })
180    }
181}
182
183impl LocalParticipant {
184    pub fn connection_quality(&self) -> ConnectionQuality {
185        connection_quality_from_livekit(self.0.connection_quality())
186    }
187
188    pub fn audio_level(&self) -> f32 {
189        self.0.audio_level()
190    }
191
192    pub async fn publish_screenshare_track(
193        &self,
194        source: &dyn ScreenCaptureSource,
195        cx: &mut AsyncApp,
196    ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
197        let (track, stream) = capture_local_video_track(source, cx).await?;
198        let options = livekit::options::TrackPublishOptions {
199            source: livekit::track::TrackSource::Screenshare,
200            video_codec: livekit::options::VideoCodec::VP8,
201            ..Default::default()
202        };
203        let publication = self
204            .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
205            .await?;
206
207        Ok((publication, stream))
208    }
209
210    async fn publish_track(
211        &self,
212        track: livekit::track::LocalTrack,
213        options: livekit::options::TrackPublishOptions,
214        cx: &mut AsyncApp,
215    ) -> Result<LocalTrackPublication> {
216        let participant = self.0.clone();
217        Tokio::spawn(cx, async move {
218            participant.publish_track(track, options).await
219        })
220        .await?
221        .map(LocalTrackPublication)
222        .context("publishing a track")
223    }
224
225    pub async fn unpublish_track(
226        &self,
227        sid: TrackSid,
228        cx: &mut AsyncApp,
229    ) -> Result<LocalTrackPublication> {
230        let participant = self.0.clone();
231        Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })
232            .await?
233            .map(LocalTrackPublication)
234            .context("unpublishing a track")
235    }
236
237    #[cfg(target_os = "linux")]
238    pub async fn publish_screenshare_track_wayland(
239        &self,
240        cx: &mut AsyncApp,
241    ) -> Result<(
242        LocalTrackPublication,
243        Box<dyn ScreenCaptureStream>,
244        futures::channel::oneshot::Receiver<()>,
245    )> {
246        let (track, stop_flag, feed_task, failure_rx) =
247            linux::start_wayland_desktop_capture(cx).await?;
248        let options = livekit::options::TrackPublishOptions {
249            source: livekit::track::TrackSource::Screenshare,
250            video_codec: livekit::options::VideoCodec::VP8,
251            ..Default::default()
252        };
253        let publication = self
254            .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
255            .await?;
256
257        Ok((
258            publication,
259            Box::new(linux::WaylandScreenCaptureStream::new(stop_flag, feed_task)),
260            failure_rx,
261        ))
262    }
263}
264
265impl LocalTrackPublication {
266    pub fn mute(&self, cx: &App) {
267        let track = self.0.clone();
268        Tokio::spawn(cx, async move {
269            track.mute();
270        })
271        .detach();
272    }
273
274    pub fn unmute(&self, cx: &App) {
275        let track = self.0.clone();
276        Tokio::spawn(cx, async move {
277            track.unmute();
278        })
279        .detach();
280    }
281
282    pub fn sid(&self) -> TrackSid {
283        self.0.sid()
284    }
285
286    pub fn is_muted(&self) -> bool {
287        self.0.is_muted()
288    }
289}
290
291impl RemoteParticipant {
292    pub fn connection_quality(&self) -> ConnectionQuality {
293        connection_quality_from_livekit(self.0.connection_quality())
294    }
295
296    pub fn audio_level(&self) -> f32 {
297        self.0.audio_level()
298    }
299
300    pub fn identity(&self) -> ParticipantIdentity {
301        ParticipantIdentity(self.0.identity().0)
302    }
303
304    pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
305        self.0
306            .track_publications()
307            .into_iter()
308            .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
309            .collect()
310    }
311}
312
313impl RemoteAudioTrack {
314    pub fn sid(&self) -> TrackSid {
315        self.0.sid()
316    }
317}
318
319impl RemoteVideoTrack {
320    pub fn sid(&self) -> TrackSid {
321        self.0.sid()
322    }
323}
324
325impl RemoteTrackPublication {
326    pub fn is_muted(&self) -> bool {
327        self.0.is_muted()
328    }
329
330    pub fn is_enabled(&self) -> bool {
331        self.0.is_enabled()
332    }
333
334    pub fn track(&self) -> Option<RemoteTrack> {
335        self.0.track().map(remote_track_from_livekit)
336    }
337
338    pub fn is_audio(&self) -> bool {
339        self.0.kind() == livekit::track::TrackKind::Audio
340    }
341
342    pub fn set_enabled(&self, enabled: bool, cx: &App) {
343        let track = self.0.clone();
344        Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
345    }
346
347    pub fn sid(&self) -> TrackSid {
348        self.0.sid()
349    }
350}
351
352impl Participant {
353    pub fn identity(&self) -> ParticipantIdentity {
354        match self {
355            Participant::Local(local_participant) => {
356                ParticipantIdentity(local_participant.0.identity().0)
357            }
358            Participant::Remote(remote_participant) => {
359                ParticipantIdentity(remote_participant.0.identity().0)
360            }
361        }
362    }
363
364    pub fn connection_quality(&self) -> ConnectionQuality {
365        match self {
366            Participant::Local(local_participant) => local_participant.connection_quality(),
367            Participant::Remote(remote_participant) => remote_participant.connection_quality(),
368        }
369    }
370
371    pub fn audio_level(&self) -> f32 {
372        match self {
373            Participant::Local(local_participant) => local_participant.audio_level(),
374            Participant::Remote(remote_participant) => remote_participant.audio_level(),
375        }
376    }
377}
378
379fn connection_quality_from_livekit(
380    quality: livekit::prelude::ConnectionQuality,
381) -> ConnectionQuality {
382    match quality {
383        livekit::prelude::ConnectionQuality::Excellent => ConnectionQuality::Excellent,
384        livekit::prelude::ConnectionQuality::Good => ConnectionQuality::Good,
385        livekit::prelude::ConnectionQuality::Poor => ConnectionQuality::Poor,
386        livekit::prelude::ConnectionQuality::Lost => ConnectionQuality::Lost,
387    }
388}
389
390fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
391    match participant {
392        livekit::participant::Participant::Local(local) => {
393            Participant::Local(LocalParticipant(local))
394        }
395        livekit::participant::Participant::Remote(remote) => {
396            Participant::Remote(RemoteParticipant(remote))
397        }
398    }
399}
400
401fn publication_from_livekit(
402    publication: livekit::publication::TrackPublication,
403) -> TrackPublication {
404    match publication {
405        livekit::publication::TrackPublication::Local(local) => {
406            TrackPublication::Local(LocalTrackPublication(local))
407        }
408        livekit::publication::TrackPublication::Remote(remote) => {
409            TrackPublication::Remote(RemoteTrackPublication(remote))
410        }
411    }
412}
413
414fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
415    match track {
416        livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
417        livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
418    }
419}
420
421fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
422    match track {
423        livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
424        livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
425    }
426}
427fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
428    let event = match event {
429        livekit::RoomEvent::ParticipantConnected(remote_participant) => {
430            RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
431        }
432        livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
433            RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
434        }
435        livekit::RoomEvent::LocalTrackPublished {
436            publication,
437            track,
438            participant,
439        } => RoomEvent::LocalTrackPublished {
440            publication: LocalTrackPublication(publication),
441            track: local_track_from_livekit(track),
442            participant: LocalParticipant(participant),
443        },
444        livekit::RoomEvent::LocalTrackUnpublished {
445            publication,
446            participant,
447        } => RoomEvent::LocalTrackUnpublished {
448            publication: LocalTrackPublication(publication),
449            participant: LocalParticipant(participant),
450        },
451        livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
452            track: local_track_from_livekit(track),
453        },
454        livekit::RoomEvent::TrackSubscribed {
455            track,
456            publication,
457            participant,
458        } => RoomEvent::TrackSubscribed {
459            track: remote_track_from_livekit(track),
460            publication: RemoteTrackPublication(publication),
461            participant: RemoteParticipant(participant),
462        },
463        livekit::RoomEvent::TrackUnsubscribed {
464            track,
465            publication,
466            participant,
467        } => RoomEvent::TrackUnsubscribed {
468            track: remote_track_from_livekit(track),
469            publication: RemoteTrackPublication(publication),
470            participant: RemoteParticipant(participant),
471        },
472        livekit::RoomEvent::TrackSubscriptionFailed {
473            participant,
474            error: _,
475            track_sid,
476        } => RoomEvent::TrackSubscriptionFailed {
477            participant: RemoteParticipant(participant),
478            track_sid,
479        },
480        livekit::RoomEvent::TrackPublished {
481            publication,
482            participant,
483        } => RoomEvent::TrackPublished {
484            publication: RemoteTrackPublication(publication),
485            participant: RemoteParticipant(participant),
486        },
487        livekit::RoomEvent::TrackUnpublished {
488            publication,
489            participant,
490        } => RoomEvent::TrackUnpublished {
491            publication: RemoteTrackPublication(publication),
492            participant: RemoteParticipant(participant),
493        },
494        livekit::RoomEvent::TrackMuted {
495            participant,
496            publication,
497        } => RoomEvent::TrackMuted {
498            publication: publication_from_livekit(publication),
499            participant: participant_from_livekit(participant),
500        },
501        livekit::RoomEvent::TrackUnmuted {
502            participant,
503            publication,
504        } => RoomEvent::TrackUnmuted {
505            publication: publication_from_livekit(publication),
506            participant: participant_from_livekit(participant),
507        },
508        livekit::RoomEvent::RoomMetadataChanged {
509            old_metadata,
510            metadata,
511        } => RoomEvent::RoomMetadataChanged {
512            old_metadata,
513            metadata,
514        },
515        livekit::RoomEvent::ParticipantMetadataChanged {
516            participant,
517            old_metadata,
518            metadata,
519        } => RoomEvent::ParticipantMetadataChanged {
520            participant: participant_from_livekit(participant),
521            old_metadata,
522            metadata,
523        },
524        livekit::RoomEvent::ParticipantNameChanged {
525            participant,
526            old_name,
527            name,
528        } => RoomEvent::ParticipantNameChanged {
529            participant: participant_from_livekit(participant),
530            old_name,
531            name,
532        },
533        livekit::RoomEvent::ParticipantAttributesChanged {
534            participant,
535            changed_attributes,
536        } => RoomEvent::ParticipantAttributesChanged {
537            participant: participant_from_livekit(participant),
538            changed_attributes: changed_attributes.into_iter().collect(),
539        },
540        livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
541            RoomEvent::ActiveSpeakersChanged {
542                speakers: speakers.into_iter().map(participant_from_livekit).collect(),
543            }
544        }
545        livekit::RoomEvent::Connected {
546            participants_with_tracks,
547        } => RoomEvent::Connected {
548            participants_with_tracks: participants_with_tracks
549                .into_iter()
550                .map({
551                    |(p, t)| {
552                        (
553                            RemoteParticipant(p),
554                            t.into_iter().map(RemoteTrackPublication).collect(),
555                        )
556                    }
557                })
558                .collect(),
559        },
560        livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
561            reason: reason.as_str_name(),
562        },
563        livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
564        livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
565        livekit::RoomEvent::ConnectionQualityChanged {
566            quality,
567            participant,
568        } => RoomEvent::ConnectionQualityChanged {
569            participant: participant_from_livekit(participant),
570            quality: connection_quality_from_livekit(quality),
571        },
572        _ => {
573            log::trace!("dropping livekit event: {:?}", event);
574            return None;
575        }
576    };
577
578    Some(event)
579}