livekit_client.rs

  1use std::sync::Arc;
  2
  3use anyhow::{Context as _, Result, anyhow};
  4use audio::AudioSettings;
  5use collections::HashMap;
  6use futures::{SinkExt, channel::mpsc};
  7use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
  8use gpui_tokio::Tokio;
  9use log::info;
 10use playback::capture_local_video_track;
 11use settings::Settings;
 12
 13mod playback;
 14
 15use crate::{
 16    LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication,
 17    livekit_client::playback::Speaker,
 18};
 19pub use playback::AudioStream;
 20pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
 21
 22#[derive(Clone, Debug)]
 23pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
 24#[derive(Clone, Debug)]
 25pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
 26#[derive(Clone, Debug)]
 27pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
 28#[derive(Clone, Debug)]
 29pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
 30
 31#[derive(Clone, Debug)]
 32pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
 33#[derive(Clone, Debug)]
 34pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
 35#[derive(Clone, Debug)]
 36pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
 37#[derive(Clone, Debug)]
 38pub struct LocalParticipant(livekit::participant::LocalParticipant);
 39
 40pub struct Room {
 41    room: livekit::Room,
 42    _task: Task<()>,
 43    playback: playback::AudioStack,
 44}
 45
 46pub type TrackSid = livekit::id::TrackSid;
 47pub type ConnectionState = livekit::ConnectionState;
 48#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
 49pub struct ParticipantIdentity(pub String);
 50
 51impl Room {
 52    pub async fn connect(
 53        url: String,
 54        token: String,
 55        cx: &mut AsyncApp,
 56    ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
 57        let connector =
 58            tokio_tungstenite::Connector::Rustls(Arc::new(http_client_tls::tls_config()));
 59        let mut config = livekit::RoomOptions::default();
 60        config.connector = Some(connector);
 61        let (room, mut events) = Tokio::spawn(cx, async move {
 62            livekit::Room::connect(&url, &token, config).await
 63        })?
 64        .await??;
 65
 66        let (mut tx, rx) = mpsc::unbounded();
 67        let task = cx.background_executor().spawn(async move {
 68            while let Some(event) = events.recv().await {
 69                if let Some(event) = room_event_from_livekit(event) {
 70                    tx.send(event).await.ok();
 71                }
 72            }
 73        });
 74
 75        Ok((
 76            Self {
 77                room,
 78                _task: task,
 79                playback: playback::AudioStack::new(cx.background_executor().clone()),
 80            },
 81            rx,
 82        ))
 83    }
 84
 85    pub fn local_participant(&self) -> LocalParticipant {
 86        LocalParticipant(self.room.local_participant())
 87    }
 88
 89    pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
 90        self.room
 91            .remote_participants()
 92            .into_iter()
 93            .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
 94            .collect()
 95    }
 96
 97    pub fn connection_state(&self) -> ConnectionState {
 98        self.room.connection_state()
 99    }
100
101    pub fn name(&self) -> String {
102        self.room.name()
103    }
104
105    pub async fn sid(&self) -> String {
106        self.room.sid().await.to_string()
107    }
108
109    pub async fn publish_local_microphone_track(
110        &self,
111        user_name: String,
112        is_staff: bool,
113        cx: &mut AsyncApp,
114    ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
115        let (track, stream) = self
116            .playback
117            .capture_local_microphone_track(user_name, is_staff, &cx)?;
118        let publication = self
119            .local_participant()
120            .publish_track(
121                livekit::track::LocalTrack::Audio(track.0),
122                livekit::options::TrackPublishOptions {
123                    source: livekit::track::TrackSource::Microphone,
124                    ..Default::default()
125                },
126                cx,
127            )
128            .await?;
129
130        Ok((publication, stream))
131    }
132
133    pub async fn unpublish_local_track(
134        &self,
135        sid: TrackSid,
136        cx: &mut AsyncApp,
137    ) -> Result<LocalTrackPublication> {
138        self.local_participant().unpublish_track(sid, cx).await
139    }
140
141    pub fn play_remote_audio_track(
142        &self,
143        track: &RemoteAudioTrack,
144        cx: &mut App,
145    ) -> Result<playback::AudioStream> {
146        let speaker: Speaker =
147            serde_urlencoded::from_str(&track.0.name()).unwrap_or_else(|_| Speaker {
148                name: track.0.name(),
149                is_staff: false,
150                sends_legacy_audio: true,
151            });
152
153        if AudioSettings::get_global(cx).rodio_audio {
154            info!("Using experimental.rodio_audio audio pipeline for output");
155            playback::play_remote_audio_track(&track.0, speaker, cx)
156        } else if speaker.sends_legacy_audio {
157            Ok(self.playback.play_remote_audio_track(&track.0))
158        } else {
159            Err(anyhow!("Client version too old to play audio in call"))
160        }
161    }
162}
163
164impl LocalParticipant {
165    pub async fn publish_screenshare_track(
166        &self,
167        source: &dyn ScreenCaptureSource,
168        cx: &mut AsyncApp,
169    ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
170        let (track, stream) = capture_local_video_track(source, cx).await?;
171        let options = livekit::options::TrackPublishOptions {
172            source: livekit::track::TrackSource::Screenshare,
173            video_codec: livekit::options::VideoCodec::VP8,
174            ..Default::default()
175        };
176        let publication = self
177            .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
178            .await?;
179
180        Ok((publication, stream))
181    }
182
183    async fn publish_track(
184        &self,
185        track: livekit::track::LocalTrack,
186        options: livekit::options::TrackPublishOptions,
187        cx: &mut AsyncApp,
188    ) -> Result<LocalTrackPublication> {
189        let participant = self.0.clone();
190        Tokio::spawn(cx, async move {
191            participant.publish_track(track, options).await
192        })?
193        .await?
194        .map(LocalTrackPublication)
195        .context("publishing a track")
196    }
197
198    pub async fn unpublish_track(
199        &self,
200        sid: TrackSid,
201        cx: &mut AsyncApp,
202    ) -> Result<LocalTrackPublication> {
203        let participant = self.0.clone();
204        Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })?
205            .await?
206            .map(LocalTrackPublication)
207            .context("unpublishing a track")
208    }
209}
210
211impl LocalTrackPublication {
212    pub fn mute(&self, cx: &App) {
213        let track = self.0.clone();
214        Tokio::spawn(cx, async move {
215            track.mute();
216        })
217        .detach();
218    }
219
220    pub fn unmute(&self, cx: &App) {
221        let track = self.0.clone();
222        Tokio::spawn(cx, async move {
223            track.unmute();
224        })
225        .detach();
226    }
227
228    pub fn sid(&self) -> TrackSid {
229        self.0.sid()
230    }
231
232    pub fn is_muted(&self) -> bool {
233        self.0.is_muted()
234    }
235}
236
237impl RemoteParticipant {
238    pub fn identity(&self) -> ParticipantIdentity {
239        ParticipantIdentity(self.0.identity().0)
240    }
241
242    pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
243        self.0
244            .track_publications()
245            .into_iter()
246            .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
247            .collect()
248    }
249}
250
251impl RemoteAudioTrack {
252    pub fn sid(&self) -> TrackSid {
253        self.0.sid()
254    }
255}
256
257impl RemoteVideoTrack {
258    pub fn sid(&self) -> TrackSid {
259        self.0.sid()
260    }
261}
262
263impl RemoteTrackPublication {
264    pub fn is_muted(&self) -> bool {
265        self.0.is_muted()
266    }
267
268    pub fn is_enabled(&self) -> bool {
269        self.0.is_enabled()
270    }
271
272    pub fn track(&self) -> Option<RemoteTrack> {
273        self.0.track().map(remote_track_from_livekit)
274    }
275
276    pub fn is_audio(&self) -> bool {
277        self.0.kind() == livekit::track::TrackKind::Audio
278    }
279
280    pub fn set_enabled(&self, enabled: bool, cx: &App) {
281        let track = self.0.clone();
282        Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
283    }
284
285    pub fn sid(&self) -> TrackSid {
286        self.0.sid()
287    }
288}
289
290impl Participant {
291    pub fn identity(&self) -> ParticipantIdentity {
292        match self {
293            Participant::Local(local_participant) => {
294                ParticipantIdentity(local_participant.0.identity().0)
295            }
296            Participant::Remote(remote_participant) => {
297                ParticipantIdentity(remote_participant.0.identity().0)
298            }
299        }
300    }
301}
302
303fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
304    match participant {
305        livekit::participant::Participant::Local(local) => {
306            Participant::Local(LocalParticipant(local))
307        }
308        livekit::participant::Participant::Remote(remote) => {
309            Participant::Remote(RemoteParticipant(remote))
310        }
311    }
312}
313
314fn publication_from_livekit(
315    publication: livekit::publication::TrackPublication,
316) -> TrackPublication {
317    match publication {
318        livekit::publication::TrackPublication::Local(local) => {
319            TrackPublication::Local(LocalTrackPublication(local))
320        }
321        livekit::publication::TrackPublication::Remote(remote) => {
322            TrackPublication::Remote(RemoteTrackPublication(remote))
323        }
324    }
325}
326
327fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
328    match track {
329        livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
330        livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
331    }
332}
333
334fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
335    match track {
336        livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
337        livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
338    }
339}
340fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
341    let event = match event {
342        livekit::RoomEvent::ParticipantConnected(remote_participant) => {
343            RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
344        }
345        livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
346            RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
347        }
348        livekit::RoomEvent::LocalTrackPublished {
349            publication,
350            track,
351            participant,
352        } => RoomEvent::LocalTrackPublished {
353            publication: LocalTrackPublication(publication),
354            track: local_track_from_livekit(track),
355            participant: LocalParticipant(participant),
356        },
357        livekit::RoomEvent::LocalTrackUnpublished {
358            publication,
359            participant,
360        } => RoomEvent::LocalTrackUnpublished {
361            publication: LocalTrackPublication(publication),
362            participant: LocalParticipant(participant),
363        },
364        livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
365            track: local_track_from_livekit(track),
366        },
367        livekit::RoomEvent::TrackSubscribed {
368            track,
369            publication,
370            participant,
371        } => RoomEvent::TrackSubscribed {
372            track: remote_track_from_livekit(track),
373            publication: RemoteTrackPublication(publication),
374            participant: RemoteParticipant(participant),
375        },
376        livekit::RoomEvent::TrackUnsubscribed {
377            track,
378            publication,
379            participant,
380        } => RoomEvent::TrackUnsubscribed {
381            track: remote_track_from_livekit(track),
382            publication: RemoteTrackPublication(publication),
383            participant: RemoteParticipant(participant),
384        },
385        livekit::RoomEvent::TrackSubscriptionFailed {
386            participant,
387            error: _,
388            track_sid,
389        } => RoomEvent::TrackSubscriptionFailed {
390            participant: RemoteParticipant(participant),
391            track_sid,
392        },
393        livekit::RoomEvent::TrackPublished {
394            publication,
395            participant,
396        } => RoomEvent::TrackPublished {
397            publication: RemoteTrackPublication(publication),
398            participant: RemoteParticipant(participant),
399        },
400        livekit::RoomEvent::TrackUnpublished {
401            publication,
402            participant,
403        } => RoomEvent::TrackUnpublished {
404            publication: RemoteTrackPublication(publication),
405            participant: RemoteParticipant(participant),
406        },
407        livekit::RoomEvent::TrackMuted {
408            participant,
409            publication,
410        } => RoomEvent::TrackMuted {
411            publication: publication_from_livekit(publication),
412            participant: participant_from_livekit(participant),
413        },
414        livekit::RoomEvent::TrackUnmuted {
415            participant,
416            publication,
417        } => RoomEvent::TrackUnmuted {
418            publication: publication_from_livekit(publication),
419            participant: participant_from_livekit(participant),
420        },
421        livekit::RoomEvent::RoomMetadataChanged {
422            old_metadata,
423            metadata,
424        } => RoomEvent::RoomMetadataChanged {
425            old_metadata,
426            metadata,
427        },
428        livekit::RoomEvent::ParticipantMetadataChanged {
429            participant,
430            old_metadata,
431            metadata,
432        } => RoomEvent::ParticipantMetadataChanged {
433            participant: participant_from_livekit(participant),
434            old_metadata,
435            metadata,
436        },
437        livekit::RoomEvent::ParticipantNameChanged {
438            participant,
439            old_name,
440            name,
441        } => RoomEvent::ParticipantNameChanged {
442            participant: participant_from_livekit(participant),
443            old_name,
444            name,
445        },
446        livekit::RoomEvent::ParticipantAttributesChanged {
447            participant,
448            changed_attributes,
449        } => RoomEvent::ParticipantAttributesChanged {
450            participant: participant_from_livekit(participant),
451            changed_attributes: changed_attributes.into_iter().collect(),
452        },
453        livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
454            RoomEvent::ActiveSpeakersChanged {
455                speakers: speakers.into_iter().map(participant_from_livekit).collect(),
456            }
457        }
458        livekit::RoomEvent::Connected {
459            participants_with_tracks,
460        } => RoomEvent::Connected {
461            participants_with_tracks: participants_with_tracks
462                .into_iter()
463                .map({
464                    |(p, t)| {
465                        (
466                            RemoteParticipant(p),
467                            t.into_iter().map(RemoteTrackPublication).collect(),
468                        )
469                    }
470                })
471                .collect(),
472        },
473        livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
474            reason: reason.as_str_name(),
475        },
476        livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
477        livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
478        _ => {
479            log::trace!("dropping livekit event: {:?}", event);
480            return None;
481        }
482    };
483
484    Some(event)
485}