livekit_client.rs

  1use anyhow::{Context as _, Result};
  2use audio::AudioSettings;
  3use collections::HashMap;
  4use futures::{SinkExt, channel::mpsc};
  5use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
  6use gpui_tokio::Tokio;
  7
  8use playback::capture_local_video_track;
  9use settings::Settings;
 10use std::sync::{Arc, atomic::AtomicU64};
 11
 12#[cfg(target_os = "linux")]
 13mod linux;
 14mod playback;
 15
 16use crate::{ConnectionQuality, LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication};
 17pub use livekit::SessionStats;
 18pub use livekit::webrtc::stats::RtcStats;
 19pub use playback::AudioStream;
 20pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
 21
 22#[derive(Clone, Debug)]
 23pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
 24#[derive(Clone, Debug)]
 25pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
 26#[derive(Clone, Debug)]
 27pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
 28#[derive(Clone, Debug)]
 29pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
 30
 31#[derive(Clone, Debug)]
 32pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
 33#[derive(Clone, Debug)]
 34pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
 35#[derive(Clone, Debug)]
 36pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
 37#[derive(Clone, Debug)]
 38pub struct LocalParticipant(livekit::participant::LocalParticipant);
 39
 40pub struct Room {
 41    room: livekit::Room,
 42    _task: Task<()>,
 43    playback: playback::AudioStack,
 44}
 45
 46pub type TrackSid = livekit::id::TrackSid;
 47pub type ConnectionState = livekit::ConnectionState;
 48#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
 49pub struct ParticipantIdentity(pub String);
 50
 51impl Room {
 52    pub async fn connect(
 53        url: String,
 54        token: String,
 55        cx: &mut AsyncApp,
 56    ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
 57        let mut config = livekit::RoomOptions::default();
 58        config.tls_config = livekit::TlsConfig(Some(http_client_tls::tls_config()));
 59        let (room, mut events) = Tokio::spawn(cx, async move {
 60            livekit::Room::connect(&url, &token, config).await
 61        })
 62        .await??;
 63
 64        let (mut tx, rx) = mpsc::unbounded();
 65        let task = cx.background_executor().spawn(async move {
 66            while let Some(event) = events.recv().await {
 67                if let Some(event) = room_event_from_livekit(event) {
 68                    tx.send(event).await.ok();
 69                }
 70            }
 71        });
 72
 73        Ok((
 74            Self {
 75                room,
 76                _task: task,
 77                playback: playback::AudioStack::new(cx.background_executor().clone()),
 78            },
 79            rx,
 80        ))
 81    }
 82
 83    pub fn local_participant(&self) -> LocalParticipant {
 84        LocalParticipant(self.room.local_participant())
 85    }
 86
 87    pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
 88        self.room
 89            .remote_participants()
 90            .into_iter()
 91            .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
 92            .collect()
 93    }
 94
 95    pub fn connection_state(&self) -> ConnectionState {
 96        self.room.connection_state()
 97    }
 98
 99    pub fn name(&self) -> String {
100        self.room.name()
101    }
102
103    pub async fn sid(&self) -> String {
104        self.room.sid().await.to_string()
105    }
106
107    pub async fn publish_local_microphone_track(
108        &self,
109        user_name: String,
110        is_staff: bool,
111        cx: &mut AsyncApp,
112    ) -> Result<(LocalTrackPublication, playback::AudioStream, Arc<AtomicU64>)> {
113        let (track, stream, input_lag_us) = self
114            .playback
115            .capture_local_microphone_track(user_name, is_staff, &cx)?;
116        let publication = self
117            .local_participant()
118            .publish_track(
119                livekit::track::LocalTrack::Audio(track.0),
120                livekit::options::TrackPublishOptions {
121                    source: livekit::track::TrackSource::Microphone,
122                    ..Default::default()
123                },
124                cx,
125            )
126            .await?;
127
128        Ok((publication, stream, input_lag_us))
129    }
130
131    pub async fn unpublish_local_track(
132        &self,
133        sid: TrackSid,
134        cx: &mut AsyncApp,
135    ) -> Result<LocalTrackPublication> {
136        self.local_participant().unpublish_track(sid, cx).await
137    }
138
139    pub fn play_remote_audio_track(
140        &self,
141        track: &RemoteAudioTrack,
142        cx: &mut App,
143    ) -> Result<playback::AudioStream> {
144        let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
145        Ok(self
146            .playback
147            .play_remote_audio_track(&track.0, output_audio_device))
148    }
149
150    pub async fn get_stats(&self) -> Result<livekit::SessionStats> {
151        self.room.get_stats().await.map_err(anyhow::Error::from)
152    }
153
154    /// Returns a `Task` that fetches room stats on the Tokio runtime.
155    ///
156    /// LiveKit's SDK is Tokio-based, so the stats fetch must run within
157    /// a Tokio context rather than on GPUI's smol-based background executor.
158    pub fn stats_task(&self, cx: &impl gpui::AppContext) -> Task<Result<livekit::SessionStats>> {
159        let inner = self.room.clone();
160        Tokio::spawn_result(cx, async move {
161            inner.get_stats().await.map_err(anyhow::Error::from)
162        })
163    }
164}
165
166impl LocalParticipant {
167    pub fn connection_quality(&self) -> ConnectionQuality {
168        connection_quality_from_livekit(self.0.connection_quality())
169    }
170
171    pub fn audio_level(&self) -> f32 {
172        self.0.audio_level()
173    }
174
175    pub async fn publish_screenshare_track(
176        &self,
177        source: &dyn ScreenCaptureSource,
178        cx: &mut AsyncApp,
179    ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
180        let (track, stream) = capture_local_video_track(source, cx).await?;
181        let options = livekit::options::TrackPublishOptions {
182            source: livekit::track::TrackSource::Screenshare,
183            video_codec: livekit::options::VideoCodec::VP8,
184            ..Default::default()
185        };
186        let publication = self
187            .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
188            .await?;
189
190        Ok((publication, stream))
191    }
192
193    async fn publish_track(
194        &self,
195        track: livekit::track::LocalTrack,
196        options: livekit::options::TrackPublishOptions,
197        cx: &mut AsyncApp,
198    ) -> Result<LocalTrackPublication> {
199        let participant = self.0.clone();
200        Tokio::spawn(cx, async move {
201            participant.publish_track(track, options).await
202        })
203        .await?
204        .map(LocalTrackPublication)
205        .context("publishing a track")
206    }
207
208    pub async fn unpublish_track(
209        &self,
210        sid: TrackSid,
211        cx: &mut AsyncApp,
212    ) -> Result<LocalTrackPublication> {
213        let participant = self.0.clone();
214        Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })
215            .await?
216            .map(LocalTrackPublication)
217            .context("unpublishing a track")
218    }
219
220    #[cfg(target_os = "linux")]
221    pub async fn publish_screenshare_track_wayland(
222        &self,
223        cx: &mut AsyncApp,
224    ) -> Result<(
225        LocalTrackPublication,
226        Box<dyn ScreenCaptureStream>,
227        futures::channel::oneshot::Receiver<()>,
228    )> {
229        let (track, stop_flag, feed_task, failure_rx) =
230            linux::start_wayland_desktop_capture(cx).await?;
231        let options = livekit::options::TrackPublishOptions {
232            source: livekit::track::TrackSource::Screenshare,
233            video_codec: livekit::options::VideoCodec::VP8,
234            ..Default::default()
235        };
236        let publication = self
237            .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
238            .await?;
239
240        Ok((
241            publication,
242            Box::new(linux::WaylandScreenCaptureStream::new(stop_flag, feed_task)),
243            failure_rx,
244        ))
245    }
246}
247
248impl LocalTrackPublication {
249    pub fn mute(&self, cx: &App) {
250        let track = self.0.clone();
251        Tokio::spawn(cx, async move {
252            track.mute();
253        })
254        .detach();
255    }
256
257    pub fn unmute(&self, cx: &App) {
258        let track = self.0.clone();
259        Tokio::spawn(cx, async move {
260            track.unmute();
261        })
262        .detach();
263    }
264
265    pub fn sid(&self) -> TrackSid {
266        self.0.sid()
267    }
268
269    pub fn is_muted(&self) -> bool {
270        self.0.is_muted()
271    }
272}
273
274impl RemoteParticipant {
275    pub fn connection_quality(&self) -> ConnectionQuality {
276        connection_quality_from_livekit(self.0.connection_quality())
277    }
278
279    pub fn audio_level(&self) -> f32 {
280        self.0.audio_level()
281    }
282
283    pub fn identity(&self) -> ParticipantIdentity {
284        ParticipantIdentity(self.0.identity().0)
285    }
286
287    pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
288        self.0
289            .track_publications()
290            .into_iter()
291            .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
292            .collect()
293    }
294}
295
296impl RemoteAudioTrack {
297    pub fn sid(&self) -> TrackSid {
298        self.0.sid()
299    }
300}
301
302impl RemoteVideoTrack {
303    pub fn sid(&self) -> TrackSid {
304        self.0.sid()
305    }
306}
307
308impl RemoteTrackPublication {
309    pub fn is_muted(&self) -> bool {
310        self.0.is_muted()
311    }
312
313    pub fn is_enabled(&self) -> bool {
314        self.0.is_enabled()
315    }
316
317    pub fn track(&self) -> Option<RemoteTrack> {
318        self.0.track().map(remote_track_from_livekit)
319    }
320
321    pub fn is_audio(&self) -> bool {
322        self.0.kind() == livekit::track::TrackKind::Audio
323    }
324
325    pub fn set_enabled(&self, enabled: bool, cx: &App) {
326        let track = self.0.clone();
327        Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
328    }
329
330    pub fn sid(&self) -> TrackSid {
331        self.0.sid()
332    }
333}
334
335impl Participant {
336    pub fn identity(&self) -> ParticipantIdentity {
337        match self {
338            Participant::Local(local_participant) => {
339                ParticipantIdentity(local_participant.0.identity().0)
340            }
341            Participant::Remote(remote_participant) => {
342                ParticipantIdentity(remote_participant.0.identity().0)
343            }
344        }
345    }
346
347    pub fn connection_quality(&self) -> ConnectionQuality {
348        match self {
349            Participant::Local(local_participant) => local_participant.connection_quality(),
350            Participant::Remote(remote_participant) => remote_participant.connection_quality(),
351        }
352    }
353
354    pub fn audio_level(&self) -> f32 {
355        match self {
356            Participant::Local(local_participant) => local_participant.audio_level(),
357            Participant::Remote(remote_participant) => remote_participant.audio_level(),
358        }
359    }
360}
361
362fn connection_quality_from_livekit(
363    quality: livekit::prelude::ConnectionQuality,
364) -> ConnectionQuality {
365    match quality {
366        livekit::prelude::ConnectionQuality::Excellent => ConnectionQuality::Excellent,
367        livekit::prelude::ConnectionQuality::Good => ConnectionQuality::Good,
368        livekit::prelude::ConnectionQuality::Poor => ConnectionQuality::Poor,
369        livekit::prelude::ConnectionQuality::Lost => ConnectionQuality::Lost,
370    }
371}
372
373fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
374    match participant {
375        livekit::participant::Participant::Local(local) => {
376            Participant::Local(LocalParticipant(local))
377        }
378        livekit::participant::Participant::Remote(remote) => {
379            Participant::Remote(RemoteParticipant(remote))
380        }
381    }
382}
383
384fn publication_from_livekit(
385    publication: livekit::publication::TrackPublication,
386) -> TrackPublication {
387    match publication {
388        livekit::publication::TrackPublication::Local(local) => {
389            TrackPublication::Local(LocalTrackPublication(local))
390        }
391        livekit::publication::TrackPublication::Remote(remote) => {
392            TrackPublication::Remote(RemoteTrackPublication(remote))
393        }
394    }
395}
396
397fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
398    match track {
399        livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
400        livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
401    }
402}
403
404fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
405    match track {
406        livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
407        livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
408    }
409}
410fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
411    let event = match event {
412        livekit::RoomEvent::ParticipantConnected(remote_participant) => {
413            RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
414        }
415        livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
416            RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
417        }
418        livekit::RoomEvent::LocalTrackPublished {
419            publication,
420            track,
421            participant,
422        } => RoomEvent::LocalTrackPublished {
423            publication: LocalTrackPublication(publication),
424            track: local_track_from_livekit(track),
425            participant: LocalParticipant(participant),
426        },
427        livekit::RoomEvent::LocalTrackUnpublished {
428            publication,
429            participant,
430        } => RoomEvent::LocalTrackUnpublished {
431            publication: LocalTrackPublication(publication),
432            participant: LocalParticipant(participant),
433        },
434        livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
435            track: local_track_from_livekit(track),
436        },
437        livekit::RoomEvent::TrackSubscribed {
438            track,
439            publication,
440            participant,
441        } => RoomEvent::TrackSubscribed {
442            track: remote_track_from_livekit(track),
443            publication: RemoteTrackPublication(publication),
444            participant: RemoteParticipant(participant),
445        },
446        livekit::RoomEvent::TrackUnsubscribed {
447            track,
448            publication,
449            participant,
450        } => RoomEvent::TrackUnsubscribed {
451            track: remote_track_from_livekit(track),
452            publication: RemoteTrackPublication(publication),
453            participant: RemoteParticipant(participant),
454        },
455        livekit::RoomEvent::TrackSubscriptionFailed {
456            participant,
457            error: _,
458            track_sid,
459        } => RoomEvent::TrackSubscriptionFailed {
460            participant: RemoteParticipant(participant),
461            track_sid,
462        },
463        livekit::RoomEvent::TrackPublished {
464            publication,
465            participant,
466        } => RoomEvent::TrackPublished {
467            publication: RemoteTrackPublication(publication),
468            participant: RemoteParticipant(participant),
469        },
470        livekit::RoomEvent::TrackUnpublished {
471            publication,
472            participant,
473        } => RoomEvent::TrackUnpublished {
474            publication: RemoteTrackPublication(publication),
475            participant: RemoteParticipant(participant),
476        },
477        livekit::RoomEvent::TrackMuted {
478            participant,
479            publication,
480        } => RoomEvent::TrackMuted {
481            publication: publication_from_livekit(publication),
482            participant: participant_from_livekit(participant),
483        },
484        livekit::RoomEvent::TrackUnmuted {
485            participant,
486            publication,
487        } => RoomEvent::TrackUnmuted {
488            publication: publication_from_livekit(publication),
489            participant: participant_from_livekit(participant),
490        },
491        livekit::RoomEvent::RoomMetadataChanged {
492            old_metadata,
493            metadata,
494        } => RoomEvent::RoomMetadataChanged {
495            old_metadata,
496            metadata,
497        },
498        livekit::RoomEvent::ParticipantMetadataChanged {
499            participant,
500            old_metadata,
501            metadata,
502        } => RoomEvent::ParticipantMetadataChanged {
503            participant: participant_from_livekit(participant),
504            old_metadata,
505            metadata,
506        },
507        livekit::RoomEvent::ParticipantNameChanged {
508            participant,
509            old_name,
510            name,
511        } => RoomEvent::ParticipantNameChanged {
512            participant: participant_from_livekit(participant),
513            old_name,
514            name,
515        },
516        livekit::RoomEvent::ParticipantAttributesChanged {
517            participant,
518            changed_attributes,
519        } => RoomEvent::ParticipantAttributesChanged {
520            participant: participant_from_livekit(participant),
521            changed_attributes: changed_attributes.into_iter().collect(),
522        },
523        livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
524            RoomEvent::ActiveSpeakersChanged {
525                speakers: speakers.into_iter().map(participant_from_livekit).collect(),
526            }
527        }
528        livekit::RoomEvent::Connected {
529            participants_with_tracks,
530        } => RoomEvent::Connected {
531            participants_with_tracks: participants_with_tracks
532                .into_iter()
533                .map({
534                    |(p, t)| {
535                        (
536                            RemoteParticipant(p),
537                            t.into_iter().map(RemoteTrackPublication).collect(),
538                        )
539                    }
540                })
541                .collect(),
542        },
543        livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
544            reason: reason.as_str_name(),
545        },
546        livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
547        livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
548        livekit::RoomEvent::ConnectionQualityChanged {
549            quality,
550            participant,
551        } => RoomEvent::ConnectionQualityChanged {
552            participant: participant_from_livekit(participant),
553            quality: connection_quality_from_livekit(quality),
554        },
555        _ => {
556            log::trace!("dropping livekit event: {:?}", event);
557            return None;
558        }
559    };
560
561    Some(event)
562}