livekit_client.rs

  1use anyhow::{Context as _, Result, anyhow};
  2use audio::AudioSettings;
  3use collections::HashMap;
  4use futures::{SinkExt, channel::mpsc};
  5use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
  6use gpui_tokio::Tokio;
  7use log::info;
  8use playback::capture_local_video_track;
  9use settings::Settings;
 10
 11mod playback;
 12
 13use crate::{
 14    LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication,
 15    livekit_client::playback::Speaker,
 16};
 17pub use playback::AudioStream;
 18pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
 19
 20#[derive(Clone, Debug)]
 21pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
 22#[derive(Clone, Debug)]
 23pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
 24#[derive(Clone, Debug)]
 25pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
 26#[derive(Clone, Debug)]
 27pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
 28
 29#[derive(Clone, Debug)]
 30pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
 31#[derive(Clone, Debug)]
 32pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
 33#[derive(Clone, Debug)]
 34pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
 35#[derive(Clone, Debug)]
 36pub struct LocalParticipant(livekit::participant::LocalParticipant);
 37
 38pub struct Room {
 39    room: livekit::Room,
 40    _task: Task<()>,
 41    playback: playback::AudioStack,
 42}
 43
 44pub type TrackSid = livekit::id::TrackSid;
 45pub type ConnectionState = livekit::ConnectionState;
 46#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
 47pub struct ParticipantIdentity(pub String);
 48
 49impl Room {
 50    pub async fn connect(
 51        url: String,
 52        token: String,
 53        cx: &mut AsyncApp,
 54    ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
 55        let mut config = livekit::RoomOptions::default();
 56        config.tls_config = livekit::TlsConfig(Some(http_client_tls::tls_config()));
 57        let (room, mut events) = Tokio::spawn(cx, async move {
 58            livekit::Room::connect(&url, &token, config).await
 59        })
 60        .await??;
 61
 62        let (mut tx, rx) = mpsc::unbounded();
 63        let task = cx.background_executor().spawn(async move {
 64            while let Some(event) = events.recv().await {
 65                if let Some(event) = room_event_from_livekit(event) {
 66                    tx.send(event).await.ok();
 67                }
 68            }
 69        });
 70
 71        Ok((
 72            Self {
 73                room,
 74                _task: task,
 75                playback: playback::AudioStack::new(cx.background_executor().clone()),
 76            },
 77            rx,
 78        ))
 79    }
 80
 81    pub fn local_participant(&self) -> LocalParticipant {
 82        LocalParticipant(self.room.local_participant())
 83    }
 84
 85    pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
 86        self.room
 87            .remote_participants()
 88            .into_iter()
 89            .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
 90            .collect()
 91    }
 92
 93    pub fn connection_state(&self) -> ConnectionState {
 94        self.room.connection_state()
 95    }
 96
 97    pub fn name(&self) -> String {
 98        self.room.name()
 99    }
100
101    pub async fn sid(&self) -> String {
102        self.room.sid().await.to_string()
103    }
104
105    pub async fn publish_local_microphone_track(
106        &self,
107        user_name: String,
108        is_staff: bool,
109        cx: &mut AsyncApp,
110    ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
111        let (track, stream) = self
112            .playback
113            .capture_local_microphone_track(user_name, is_staff, &cx)?;
114        let publication = self
115            .local_participant()
116            .publish_track(
117                livekit::track::LocalTrack::Audio(track.0),
118                livekit::options::TrackPublishOptions {
119                    source: livekit::track::TrackSource::Microphone,
120                    ..Default::default()
121                },
122                cx,
123            )
124            .await?;
125
126        Ok((publication, stream))
127    }
128
129    pub async fn unpublish_local_track(
130        &self,
131        sid: TrackSid,
132        cx: &mut AsyncApp,
133    ) -> Result<LocalTrackPublication> {
134        self.local_participant().unpublish_track(sid, cx).await
135    }
136
137    pub fn play_remote_audio_track(
138        &self,
139        track: &RemoteAudioTrack,
140        cx: &mut App,
141    ) -> Result<playback::AudioStream> {
142        let speaker: Speaker =
143            serde_urlencoded::from_str(&track.0.name()).unwrap_or_else(|_| Speaker {
144                name: track.0.name(),
145                is_staff: false,
146                sends_legacy_audio: true,
147            });
148
149        if AudioSettings::get_global(cx).rodio_audio {
150            info!("Using experimental.rodio_audio audio pipeline for output");
151            playback::play_remote_audio_track(&track.0, speaker, cx)
152        } else if speaker.sends_legacy_audio {
153            Ok(self.playback.play_remote_audio_track(&track.0))
154        } else {
155            Err(anyhow!("Client version too old to play audio in call"))
156        }
157    }
158}
159
160impl LocalParticipant {
161    pub async fn publish_screenshare_track(
162        &self,
163        source: &dyn ScreenCaptureSource,
164        cx: &mut AsyncApp,
165    ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
166        let (track, stream) = capture_local_video_track(source, cx).await?;
167        let options = livekit::options::TrackPublishOptions {
168            source: livekit::track::TrackSource::Screenshare,
169            video_codec: livekit::options::VideoCodec::VP8,
170            ..Default::default()
171        };
172        let publication = self
173            .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
174            .await?;
175
176        Ok((publication, stream))
177    }
178
179    async fn publish_track(
180        &self,
181        track: livekit::track::LocalTrack,
182        options: livekit::options::TrackPublishOptions,
183        cx: &mut AsyncApp,
184    ) -> Result<LocalTrackPublication> {
185        let participant = self.0.clone();
186        Tokio::spawn(cx, async move {
187            participant.publish_track(track, options).await
188        })
189        .await?
190        .map(LocalTrackPublication)
191        .context("publishing a track")
192    }
193
194    pub async fn unpublish_track(
195        &self,
196        sid: TrackSid,
197        cx: &mut AsyncApp,
198    ) -> Result<LocalTrackPublication> {
199        let participant = self.0.clone();
200        Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })
201            .await?
202            .map(LocalTrackPublication)
203            .context("unpublishing a track")
204    }
205}
206
207impl LocalTrackPublication {
208    pub fn mute(&self, cx: &App) {
209        let track = self.0.clone();
210        Tokio::spawn(cx, async move {
211            track.mute();
212        })
213        .detach();
214    }
215
216    pub fn unmute(&self, cx: &App) {
217        let track = self.0.clone();
218        Tokio::spawn(cx, async move {
219            track.unmute();
220        })
221        .detach();
222    }
223
224    pub fn sid(&self) -> TrackSid {
225        self.0.sid()
226    }
227
228    pub fn is_muted(&self) -> bool {
229        self.0.is_muted()
230    }
231}
232
233impl RemoteParticipant {
234    pub fn identity(&self) -> ParticipantIdentity {
235        ParticipantIdentity(self.0.identity().0)
236    }
237
238    pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
239        self.0
240            .track_publications()
241            .into_iter()
242            .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
243            .collect()
244    }
245}
246
247impl RemoteAudioTrack {
248    pub fn sid(&self) -> TrackSid {
249        self.0.sid()
250    }
251}
252
253impl RemoteVideoTrack {
254    pub fn sid(&self) -> TrackSid {
255        self.0.sid()
256    }
257}
258
259impl RemoteTrackPublication {
260    pub fn is_muted(&self) -> bool {
261        self.0.is_muted()
262    }
263
264    pub fn is_enabled(&self) -> bool {
265        self.0.is_enabled()
266    }
267
268    pub fn track(&self) -> Option<RemoteTrack> {
269        self.0.track().map(remote_track_from_livekit)
270    }
271
272    pub fn is_audio(&self) -> bool {
273        self.0.kind() == livekit::track::TrackKind::Audio
274    }
275
276    pub fn set_enabled(&self, enabled: bool, cx: &App) {
277        let track = self.0.clone();
278        Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
279    }
280
281    pub fn sid(&self) -> TrackSid {
282        self.0.sid()
283    }
284}
285
286impl Participant {
287    pub fn identity(&self) -> ParticipantIdentity {
288        match self {
289            Participant::Local(local_participant) => {
290                ParticipantIdentity(local_participant.0.identity().0)
291            }
292            Participant::Remote(remote_participant) => {
293                ParticipantIdentity(remote_participant.0.identity().0)
294            }
295        }
296    }
297}
298
299fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
300    match participant {
301        livekit::participant::Participant::Local(local) => {
302            Participant::Local(LocalParticipant(local))
303        }
304        livekit::participant::Participant::Remote(remote) => {
305            Participant::Remote(RemoteParticipant(remote))
306        }
307    }
308}
309
310fn publication_from_livekit(
311    publication: livekit::publication::TrackPublication,
312) -> TrackPublication {
313    match publication {
314        livekit::publication::TrackPublication::Local(local) => {
315            TrackPublication::Local(LocalTrackPublication(local))
316        }
317        livekit::publication::TrackPublication::Remote(remote) => {
318            TrackPublication::Remote(RemoteTrackPublication(remote))
319        }
320    }
321}
322
323fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
324    match track {
325        livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
326        livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
327    }
328}
329
330fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
331    match track {
332        livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
333        livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
334    }
335}
336fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
337    let event = match event {
338        livekit::RoomEvent::ParticipantConnected(remote_participant) => {
339            RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
340        }
341        livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
342            RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
343        }
344        livekit::RoomEvent::LocalTrackPublished {
345            publication,
346            track,
347            participant,
348        } => RoomEvent::LocalTrackPublished {
349            publication: LocalTrackPublication(publication),
350            track: local_track_from_livekit(track),
351            participant: LocalParticipant(participant),
352        },
353        livekit::RoomEvent::LocalTrackUnpublished {
354            publication,
355            participant,
356        } => RoomEvent::LocalTrackUnpublished {
357            publication: LocalTrackPublication(publication),
358            participant: LocalParticipant(participant),
359        },
360        livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
361            track: local_track_from_livekit(track),
362        },
363        livekit::RoomEvent::TrackSubscribed {
364            track,
365            publication,
366            participant,
367        } => RoomEvent::TrackSubscribed {
368            track: remote_track_from_livekit(track),
369            publication: RemoteTrackPublication(publication),
370            participant: RemoteParticipant(participant),
371        },
372        livekit::RoomEvent::TrackUnsubscribed {
373            track,
374            publication,
375            participant,
376        } => RoomEvent::TrackUnsubscribed {
377            track: remote_track_from_livekit(track),
378            publication: RemoteTrackPublication(publication),
379            participant: RemoteParticipant(participant),
380        },
381        livekit::RoomEvent::TrackSubscriptionFailed {
382            participant,
383            error: _,
384            track_sid,
385        } => RoomEvent::TrackSubscriptionFailed {
386            participant: RemoteParticipant(participant),
387            track_sid,
388        },
389        livekit::RoomEvent::TrackPublished {
390            publication,
391            participant,
392        } => RoomEvent::TrackPublished {
393            publication: RemoteTrackPublication(publication),
394            participant: RemoteParticipant(participant),
395        },
396        livekit::RoomEvent::TrackUnpublished {
397            publication,
398            participant,
399        } => RoomEvent::TrackUnpublished {
400            publication: RemoteTrackPublication(publication),
401            participant: RemoteParticipant(participant),
402        },
403        livekit::RoomEvent::TrackMuted {
404            participant,
405            publication,
406        } => RoomEvent::TrackMuted {
407            publication: publication_from_livekit(publication),
408            participant: participant_from_livekit(participant),
409        },
410        livekit::RoomEvent::TrackUnmuted {
411            participant,
412            publication,
413        } => RoomEvent::TrackUnmuted {
414            publication: publication_from_livekit(publication),
415            participant: participant_from_livekit(participant),
416        },
417        livekit::RoomEvent::RoomMetadataChanged {
418            old_metadata,
419            metadata,
420        } => RoomEvent::RoomMetadataChanged {
421            old_metadata,
422            metadata,
423        },
424        livekit::RoomEvent::ParticipantMetadataChanged {
425            participant,
426            old_metadata,
427            metadata,
428        } => RoomEvent::ParticipantMetadataChanged {
429            participant: participant_from_livekit(participant),
430            old_metadata,
431            metadata,
432        },
433        livekit::RoomEvent::ParticipantNameChanged {
434            participant,
435            old_name,
436            name,
437        } => RoomEvent::ParticipantNameChanged {
438            participant: participant_from_livekit(participant),
439            old_name,
440            name,
441        },
442        livekit::RoomEvent::ParticipantAttributesChanged {
443            participant,
444            changed_attributes,
445        } => RoomEvent::ParticipantAttributesChanged {
446            participant: participant_from_livekit(participant),
447            changed_attributes: changed_attributes.into_iter().collect(),
448        },
449        livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
450            RoomEvent::ActiveSpeakersChanged {
451                speakers: speakers.into_iter().map(participant_from_livekit).collect(),
452            }
453        }
454        livekit::RoomEvent::Connected {
455            participants_with_tracks,
456        } => RoomEvent::Connected {
457            participants_with_tracks: participants_with_tracks
458                .into_iter()
459                .map({
460                    |(p, t)| {
461                        (
462                            RemoteParticipant(p),
463                            t.into_iter().map(RemoteTrackPublication).collect(),
464                        )
465                    }
466                })
467                .collect(),
468        },
469        livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
470            reason: reason.as_str_name(),
471        },
472        livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
473        livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
474        _ => {
475            log::trace!("dropping livekit event: {:?}", event);
476            return None;
477        }
478    };
479
480    Some(event)
481}