livekit_client.rs

  1use anyhow::{Context as _, Result, anyhow};
  2use audio::AudioSettings;
  3use collections::HashMap;
  4use futures::{SinkExt, channel::mpsc};
  5use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
  6use gpui_tokio::Tokio;
  7use log::info;
  8use playback::capture_local_video_track;
  9use settings::Settings;
 10
 11mod playback;
 12
 13use crate::{
 14    LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication,
 15    livekit_client::playback::Speaker,
 16};
 17pub use playback::AudioStream;
 18pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
 19
 20#[derive(Clone, Debug)]
 21pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
 22#[derive(Clone, Debug)]
 23pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
 24#[derive(Clone, Debug)]
 25pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
 26#[derive(Clone, Debug)]
 27pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
 28
 29#[derive(Clone, Debug)]
 30pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
 31#[derive(Clone, Debug)]
 32pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
 33#[derive(Clone, Debug)]
 34pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
 35#[derive(Clone, Debug)]
 36pub struct LocalParticipant(livekit::participant::LocalParticipant);
 37
 38pub struct Room {
 39    room: livekit::Room,
 40    _task: Task<()>,
 41    playback: playback::AudioStack,
 42}
 43
 44pub type TrackSid = livekit::id::TrackSid;
 45pub type ConnectionState = livekit::ConnectionState;
 46#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
 47pub struct ParticipantIdentity(pub String);
 48
 49impl Room {
 50    pub async fn connect(
 51        url: String,
 52        token: String,
 53        cx: &mut AsyncApp,
 54    ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
 55        let mut config = livekit::RoomOptions::default();
 56        config.tls_config = livekit::TlsConfig(Some(http_client_tls::tls_config()));
 57        let (room, mut events) = Tokio::spawn(cx, async move {
 58            livekit::Room::connect(&url, &token, config).await
 59        })
 60        .await??;
 61
 62        let (mut tx, rx) = mpsc::unbounded();
 63        let task = cx.background_executor().spawn(async move {
 64            while let Some(event) = events.recv().await {
 65                if let Some(event) = room_event_from_livekit(event) {
 66                    tx.send(event).await.ok();
 67                }
 68            }
 69        });
 70
 71        Ok((
 72            Self {
 73                room,
 74                _task: task,
 75                playback: playback::AudioStack::new(cx.background_executor().clone()),
 76            },
 77            rx,
 78        ))
 79    }
 80
 81    pub fn local_participant(&self) -> LocalParticipant {
 82        LocalParticipant(self.room.local_participant())
 83    }
 84
 85    pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
 86        self.room
 87            .remote_participants()
 88            .into_iter()
 89            .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
 90            .collect()
 91    }
 92
 93    pub fn connection_state(&self) -> ConnectionState {
 94        self.room.connection_state()
 95    }
 96
 97    pub fn name(&self) -> String {
 98        self.room.name()
 99    }
100
101    pub async fn sid(&self) -> String {
102        self.room.sid().await.to_string()
103    }
104
105    pub async fn publish_local_microphone_track(
106        &self,
107        user_name: String,
108        is_staff: bool,
109        cx: &mut AsyncApp,
110    ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
111        let (track, stream) = self
112            .playback
113            .capture_local_microphone_track(user_name, is_staff, &cx)?;
114        let publication = self
115            .local_participant()
116            .publish_track(
117                livekit::track::LocalTrack::Audio(track.0),
118                livekit::options::TrackPublishOptions {
119                    source: livekit::track::TrackSource::Microphone,
120                    ..Default::default()
121                },
122                cx,
123            )
124            .await?;
125
126        Ok((publication, stream))
127    }
128
129    pub async fn unpublish_local_track(
130        &self,
131        sid: TrackSid,
132        cx: &mut AsyncApp,
133    ) -> Result<LocalTrackPublication> {
134        self.local_participant().unpublish_track(sid, cx).await
135    }
136
137    pub fn play_remote_audio_track(
138        &self,
139        track: &RemoteAudioTrack,
140        cx: &mut App,
141    ) -> Result<playback::AudioStream> {
142        let speaker: Speaker =
143            serde_urlencoded::from_str(&track.0.name()).unwrap_or_else(|_| Speaker {
144                name: track.0.name(),
145                is_staff: false,
146                sends_legacy_audio: true,
147            });
148
149        if AudioSettings::get_global(cx).rodio_audio {
150            info!("Using experimental.rodio_audio audio pipeline for output");
151            playback::play_remote_audio_track(&track.0, speaker, cx)
152        } else if speaker.sends_legacy_audio {
153            let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
154            Ok(self
155                .playback
156                .play_remote_audio_track(&track.0, output_audio_device))
157        } else {
158            Err(anyhow!("Client version too old to play audio in call"))
159        }
160    }
161}
162
163impl LocalParticipant {
164    pub async fn publish_screenshare_track(
165        &self,
166        source: &dyn ScreenCaptureSource,
167        cx: &mut AsyncApp,
168    ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
169        let (track, stream) = capture_local_video_track(source, cx).await?;
170        let options = livekit::options::TrackPublishOptions {
171            source: livekit::track::TrackSource::Screenshare,
172            video_codec: livekit::options::VideoCodec::VP8,
173            ..Default::default()
174        };
175        let publication = self
176            .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
177            .await?;
178
179        Ok((publication, stream))
180    }
181
182    async fn publish_track(
183        &self,
184        track: livekit::track::LocalTrack,
185        options: livekit::options::TrackPublishOptions,
186        cx: &mut AsyncApp,
187    ) -> Result<LocalTrackPublication> {
188        let participant = self.0.clone();
189        Tokio::spawn(cx, async move {
190            participant.publish_track(track, options).await
191        })
192        .await?
193        .map(LocalTrackPublication)
194        .context("publishing a track")
195    }
196
197    pub async fn unpublish_track(
198        &self,
199        sid: TrackSid,
200        cx: &mut AsyncApp,
201    ) -> Result<LocalTrackPublication> {
202        let participant = self.0.clone();
203        Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })
204            .await?
205            .map(LocalTrackPublication)
206            .context("unpublishing a track")
207    }
208}
209
210impl LocalTrackPublication {
211    pub fn mute(&self, cx: &App) {
212        let track = self.0.clone();
213        Tokio::spawn(cx, async move {
214            track.mute();
215        })
216        .detach();
217    }
218
219    pub fn unmute(&self, cx: &App) {
220        let track = self.0.clone();
221        Tokio::spawn(cx, async move {
222            track.unmute();
223        })
224        .detach();
225    }
226
227    pub fn sid(&self) -> TrackSid {
228        self.0.sid()
229    }
230
231    pub fn is_muted(&self) -> bool {
232        self.0.is_muted()
233    }
234}
235
236impl RemoteParticipant {
237    pub fn identity(&self) -> ParticipantIdentity {
238        ParticipantIdentity(self.0.identity().0)
239    }
240
241    pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
242        self.0
243            .track_publications()
244            .into_iter()
245            .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
246            .collect()
247    }
248}
249
250impl RemoteAudioTrack {
251    pub fn sid(&self) -> TrackSid {
252        self.0.sid()
253    }
254}
255
256impl RemoteVideoTrack {
257    pub fn sid(&self) -> TrackSid {
258        self.0.sid()
259    }
260}
261
262impl RemoteTrackPublication {
263    pub fn is_muted(&self) -> bool {
264        self.0.is_muted()
265    }
266
267    pub fn is_enabled(&self) -> bool {
268        self.0.is_enabled()
269    }
270
271    pub fn track(&self) -> Option<RemoteTrack> {
272        self.0.track().map(remote_track_from_livekit)
273    }
274
275    pub fn is_audio(&self) -> bool {
276        self.0.kind() == livekit::track::TrackKind::Audio
277    }
278
279    pub fn set_enabled(&self, enabled: bool, cx: &App) {
280        let track = self.0.clone();
281        Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
282    }
283
284    pub fn sid(&self) -> TrackSid {
285        self.0.sid()
286    }
287}
288
289impl Participant {
290    pub fn identity(&self) -> ParticipantIdentity {
291        match self {
292            Participant::Local(local_participant) => {
293                ParticipantIdentity(local_participant.0.identity().0)
294            }
295            Participant::Remote(remote_participant) => {
296                ParticipantIdentity(remote_participant.0.identity().0)
297            }
298        }
299    }
300}
301
302fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
303    match participant {
304        livekit::participant::Participant::Local(local) => {
305            Participant::Local(LocalParticipant(local))
306        }
307        livekit::participant::Participant::Remote(remote) => {
308            Participant::Remote(RemoteParticipant(remote))
309        }
310    }
311}
312
313fn publication_from_livekit(
314    publication: livekit::publication::TrackPublication,
315) -> TrackPublication {
316    match publication {
317        livekit::publication::TrackPublication::Local(local) => {
318            TrackPublication::Local(LocalTrackPublication(local))
319        }
320        livekit::publication::TrackPublication::Remote(remote) => {
321            TrackPublication::Remote(RemoteTrackPublication(remote))
322        }
323    }
324}
325
326fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
327    match track {
328        livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
329        livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
330    }
331}
332
333fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
334    match track {
335        livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
336        livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
337    }
338}
339fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
340    let event = match event {
341        livekit::RoomEvent::ParticipantConnected(remote_participant) => {
342            RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
343        }
344        livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
345            RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
346        }
347        livekit::RoomEvent::LocalTrackPublished {
348            publication,
349            track,
350            participant,
351        } => RoomEvent::LocalTrackPublished {
352            publication: LocalTrackPublication(publication),
353            track: local_track_from_livekit(track),
354            participant: LocalParticipant(participant),
355        },
356        livekit::RoomEvent::LocalTrackUnpublished {
357            publication,
358            participant,
359        } => RoomEvent::LocalTrackUnpublished {
360            publication: LocalTrackPublication(publication),
361            participant: LocalParticipant(participant),
362        },
363        livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
364            track: local_track_from_livekit(track),
365        },
366        livekit::RoomEvent::TrackSubscribed {
367            track,
368            publication,
369            participant,
370        } => RoomEvent::TrackSubscribed {
371            track: remote_track_from_livekit(track),
372            publication: RemoteTrackPublication(publication),
373            participant: RemoteParticipant(participant),
374        },
375        livekit::RoomEvent::TrackUnsubscribed {
376            track,
377            publication,
378            participant,
379        } => RoomEvent::TrackUnsubscribed {
380            track: remote_track_from_livekit(track),
381            publication: RemoteTrackPublication(publication),
382            participant: RemoteParticipant(participant),
383        },
384        livekit::RoomEvent::TrackSubscriptionFailed {
385            participant,
386            error: _,
387            track_sid,
388        } => RoomEvent::TrackSubscriptionFailed {
389            participant: RemoteParticipant(participant),
390            track_sid,
391        },
392        livekit::RoomEvent::TrackPublished {
393            publication,
394            participant,
395        } => RoomEvent::TrackPublished {
396            publication: RemoteTrackPublication(publication),
397            participant: RemoteParticipant(participant),
398        },
399        livekit::RoomEvent::TrackUnpublished {
400            publication,
401            participant,
402        } => RoomEvent::TrackUnpublished {
403            publication: RemoteTrackPublication(publication),
404            participant: RemoteParticipant(participant),
405        },
406        livekit::RoomEvent::TrackMuted {
407            participant,
408            publication,
409        } => RoomEvent::TrackMuted {
410            publication: publication_from_livekit(publication),
411            participant: participant_from_livekit(participant),
412        },
413        livekit::RoomEvent::TrackUnmuted {
414            participant,
415            publication,
416        } => RoomEvent::TrackUnmuted {
417            publication: publication_from_livekit(publication),
418            participant: participant_from_livekit(participant),
419        },
420        livekit::RoomEvent::RoomMetadataChanged {
421            old_metadata,
422            metadata,
423        } => RoomEvent::RoomMetadataChanged {
424            old_metadata,
425            metadata,
426        },
427        livekit::RoomEvent::ParticipantMetadataChanged {
428            participant,
429            old_metadata,
430            metadata,
431        } => RoomEvent::ParticipantMetadataChanged {
432            participant: participant_from_livekit(participant),
433            old_metadata,
434            metadata,
435        },
436        livekit::RoomEvent::ParticipantNameChanged {
437            participant,
438            old_name,
439            name,
440        } => RoomEvent::ParticipantNameChanged {
441            participant: participant_from_livekit(participant),
442            old_name,
443            name,
444        },
445        livekit::RoomEvent::ParticipantAttributesChanged {
446            participant,
447            changed_attributes,
448        } => RoomEvent::ParticipantAttributesChanged {
449            participant: participant_from_livekit(participant),
450            changed_attributes: changed_attributes.into_iter().collect(),
451        },
452        livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
453            RoomEvent::ActiveSpeakersChanged {
454                speakers: speakers.into_iter().map(participant_from_livekit).collect(),
455            }
456        }
457        livekit::RoomEvent::Connected {
458            participants_with_tracks,
459        } => RoomEvent::Connected {
460            participants_with_tracks: participants_with_tracks
461                .into_iter()
462                .map({
463                    |(p, t)| {
464                        (
465                            RemoteParticipant(p),
466                            t.into_iter().map(RemoteTrackPublication).collect(),
467                        )
468                    }
469                })
470                .collect(),
471        },
472        livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
473            reason: reason.as_str_name(),
474        },
475        livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
476        livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
477        _ => {
478            log::trace!("dropping livekit event: {:?}", event);
479            return None;
480        }
481    };
482
483    Some(event)
484}