livekit_client.rs

  1use std::sync::Arc;
  2
  3use anyhow::{Context as _, Result};
  4use audio::AudioSettings;
  5use collections::HashMap;
  6use futures::{SinkExt, channel::mpsc};
  7use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
  8use gpui_tokio::Tokio;
  9use log::info;
 10use playback::capture_local_video_track;
 11use settings::Settings;
 12
 13mod playback;
 14
 15use crate::{LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication};
 16pub use playback::AudioStream;
 17pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
 18
 19#[derive(Clone, Debug)]
 20pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
 21#[derive(Clone, Debug)]
 22pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
 23#[derive(Clone, Debug)]
 24pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
 25#[derive(Clone, Debug)]
 26pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
 27
 28#[derive(Clone, Debug)]
 29pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
 30#[derive(Clone, Debug)]
 31pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
 32#[derive(Clone, Debug)]
 33pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
 34#[derive(Clone, Debug)]
 35pub struct LocalParticipant(livekit::participant::LocalParticipant);
 36
 37pub struct Room {
 38    room: livekit::Room,
 39    _task: Task<()>,
 40    playback: playback::AudioStack,
 41}
 42
 43pub type TrackSid = livekit::id::TrackSid;
 44pub type ConnectionState = livekit::ConnectionState;
 45#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
 46pub struct ParticipantIdentity(pub String);
 47
 48impl Room {
 49    pub async fn connect(
 50        url: String,
 51        token: String,
 52        cx: &mut AsyncApp,
 53    ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
 54        let connector =
 55            tokio_tungstenite::Connector::Rustls(Arc::new(http_client_tls::tls_config()));
 56        let mut config = livekit::RoomOptions::default();
 57        config.connector = Some(connector);
 58        let (room, mut events) = Tokio::spawn(cx, async move {
 59            livekit::Room::connect(&url, &token, config).await
 60        })?
 61        .await??;
 62
 63        let (mut tx, rx) = mpsc::unbounded();
 64        let task = cx.background_executor().spawn(async move {
 65            while let Some(event) = events.recv().await {
 66                if let Some(event) = room_event_from_livekit(event) {
 67                    tx.send(event).await.ok();
 68                }
 69            }
 70        });
 71
 72        Ok((
 73            Self {
 74                room,
 75                _task: task,
 76                playback: playback::AudioStack::new(cx.background_executor().clone()),
 77            },
 78            rx,
 79        ))
 80    }
 81
 82    pub fn local_participant(&self) -> LocalParticipant {
 83        LocalParticipant(self.room.local_participant())
 84    }
 85
 86    pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
 87        self.room
 88            .remote_participants()
 89            .into_iter()
 90            .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
 91            .collect()
 92    }
 93
 94    pub fn connection_state(&self) -> ConnectionState {
 95        self.room.connection_state()
 96    }
 97
 98    pub async fn publish_local_microphone_track(
 99        &self,
100        cx: &mut AsyncApp,
101    ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
102        let (track, stream) = self.playback.capture_local_microphone_track()?;
103        let publication = self
104            .local_participant()
105            .publish_track(
106                livekit::track::LocalTrack::Audio(track.0),
107                livekit::options::TrackPublishOptions {
108                    source: livekit::track::TrackSource::Microphone,
109                    ..Default::default()
110                },
111                cx,
112            )
113            .await?;
114
115        Ok((publication, stream))
116    }
117
118    pub async fn unpublish_local_track(
119        &self,
120        sid: TrackSid,
121        cx: &mut AsyncApp,
122    ) -> Result<LocalTrackPublication> {
123        self.local_participant().unpublish_track(sid, cx).await
124    }
125
126    pub fn play_remote_audio_track(
127        &self,
128        track: &RemoteAudioTrack,
129        cx: &mut App,
130    ) -> Result<playback::AudioStream> {
131        if AudioSettings::get_global(cx).rodio_audio {
132            info!("Using experimental.rodio_audio audio pipeline");
133            playback::play_remote_audio_track(&track.0, cx)
134        } else {
135            Ok(self.playback.play_remote_audio_track(&track.0))
136        }
137    }
138}
139
140impl LocalParticipant {
141    pub async fn publish_screenshare_track(
142        &self,
143        source: &dyn ScreenCaptureSource,
144        cx: &mut AsyncApp,
145    ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
146        let (track, stream) = capture_local_video_track(source, cx).await?;
147        let options = livekit::options::TrackPublishOptions {
148            source: livekit::track::TrackSource::Screenshare,
149            video_codec: livekit::options::VideoCodec::VP8,
150            ..Default::default()
151        };
152        let publication = self
153            .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
154            .await?;
155
156        Ok((publication, stream))
157    }
158
159    async fn publish_track(
160        &self,
161        track: livekit::track::LocalTrack,
162        options: livekit::options::TrackPublishOptions,
163        cx: &mut AsyncApp,
164    ) -> Result<LocalTrackPublication> {
165        let participant = self.0.clone();
166        Tokio::spawn(cx, async move {
167            participant.publish_track(track, options).await
168        })?
169        .await?
170        .map(LocalTrackPublication)
171        .context("publishing a track")
172    }
173
174    pub async fn unpublish_track(
175        &self,
176        sid: TrackSid,
177        cx: &mut AsyncApp,
178    ) -> Result<LocalTrackPublication> {
179        let participant = self.0.clone();
180        Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })?
181            .await?
182            .map(LocalTrackPublication)
183            .context("unpublishing a track")
184    }
185}
186
187impl LocalTrackPublication {
188    pub fn mute(&self, cx: &App) {
189        let track = self.0.clone();
190        Tokio::spawn(cx, async move {
191            track.mute();
192        })
193        .detach();
194    }
195
196    pub fn unmute(&self, cx: &App) {
197        let track = self.0.clone();
198        Tokio::spawn(cx, async move {
199            track.unmute();
200        })
201        .detach();
202    }
203
204    pub fn sid(&self) -> TrackSid {
205        self.0.sid()
206    }
207
208    pub fn is_muted(&self) -> bool {
209        self.0.is_muted()
210    }
211}
212
213impl RemoteParticipant {
214    pub fn identity(&self) -> ParticipantIdentity {
215        ParticipantIdentity(self.0.identity().0)
216    }
217
218    pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
219        self.0
220            .track_publications()
221            .into_iter()
222            .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
223            .collect()
224    }
225}
226
227impl RemoteAudioTrack {
228    pub fn sid(&self) -> TrackSid {
229        self.0.sid()
230    }
231}
232
233impl RemoteVideoTrack {
234    pub fn sid(&self) -> TrackSid {
235        self.0.sid()
236    }
237}
238
239impl RemoteTrackPublication {
240    pub fn is_muted(&self) -> bool {
241        self.0.is_muted()
242    }
243
244    pub fn is_enabled(&self) -> bool {
245        self.0.is_enabled()
246    }
247
248    pub fn track(&self) -> Option<RemoteTrack> {
249        self.0.track().map(remote_track_from_livekit)
250    }
251
252    pub fn is_audio(&self) -> bool {
253        self.0.kind() == livekit::track::TrackKind::Audio
254    }
255
256    pub fn set_enabled(&self, enabled: bool, cx: &App) {
257        let track = self.0.clone();
258        Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
259    }
260
261    pub fn sid(&self) -> TrackSid {
262        self.0.sid()
263    }
264}
265
266impl Participant {
267    pub fn identity(&self) -> ParticipantIdentity {
268        match self {
269            Participant::Local(local_participant) => {
270                ParticipantIdentity(local_participant.0.identity().0)
271            }
272            Participant::Remote(remote_participant) => {
273                ParticipantIdentity(remote_participant.0.identity().0)
274            }
275        }
276    }
277}
278
279fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
280    match participant {
281        livekit::participant::Participant::Local(local) => {
282            Participant::Local(LocalParticipant(local))
283        }
284        livekit::participant::Participant::Remote(remote) => {
285            Participant::Remote(RemoteParticipant(remote))
286        }
287    }
288}
289
290fn publication_from_livekit(
291    publication: livekit::publication::TrackPublication,
292) -> TrackPublication {
293    match publication {
294        livekit::publication::TrackPublication::Local(local) => {
295            TrackPublication::Local(LocalTrackPublication(local))
296        }
297        livekit::publication::TrackPublication::Remote(remote) => {
298            TrackPublication::Remote(RemoteTrackPublication(remote))
299        }
300    }
301}
302
303fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
304    match track {
305        livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
306        livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
307    }
308}
309
310fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
311    match track {
312        livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
313        livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
314    }
315}
316fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
317    let event = match event {
318        livekit::RoomEvent::ParticipantConnected(remote_participant) => {
319            RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
320        }
321        livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
322            RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
323        }
324        livekit::RoomEvent::LocalTrackPublished {
325            publication,
326            track,
327            participant,
328        } => RoomEvent::LocalTrackPublished {
329            publication: LocalTrackPublication(publication),
330            track: local_track_from_livekit(track),
331            participant: LocalParticipant(participant),
332        },
333        livekit::RoomEvent::LocalTrackUnpublished {
334            publication,
335            participant,
336        } => RoomEvent::LocalTrackUnpublished {
337            publication: LocalTrackPublication(publication),
338            participant: LocalParticipant(participant),
339        },
340        livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
341            track: local_track_from_livekit(track),
342        },
343        livekit::RoomEvent::TrackSubscribed {
344            track,
345            publication,
346            participant,
347        } => RoomEvent::TrackSubscribed {
348            track: remote_track_from_livekit(track),
349            publication: RemoteTrackPublication(publication),
350            participant: RemoteParticipant(participant),
351        },
352        livekit::RoomEvent::TrackUnsubscribed {
353            track,
354            publication,
355            participant,
356        } => RoomEvent::TrackUnsubscribed {
357            track: remote_track_from_livekit(track),
358            publication: RemoteTrackPublication(publication),
359            participant: RemoteParticipant(participant),
360        },
361        livekit::RoomEvent::TrackSubscriptionFailed {
362            participant,
363            error: _,
364            track_sid,
365        } => RoomEvent::TrackSubscriptionFailed {
366            participant: RemoteParticipant(participant),
367            track_sid,
368        },
369        livekit::RoomEvent::TrackPublished {
370            publication,
371            participant,
372        } => RoomEvent::TrackPublished {
373            publication: RemoteTrackPublication(publication),
374            participant: RemoteParticipant(participant),
375        },
376        livekit::RoomEvent::TrackUnpublished {
377            publication,
378            participant,
379        } => RoomEvent::TrackUnpublished {
380            publication: RemoteTrackPublication(publication),
381            participant: RemoteParticipant(participant),
382        },
383        livekit::RoomEvent::TrackMuted {
384            participant,
385            publication,
386        } => RoomEvent::TrackMuted {
387            publication: publication_from_livekit(publication),
388            participant: participant_from_livekit(participant),
389        },
390        livekit::RoomEvent::TrackUnmuted {
391            participant,
392            publication,
393        } => RoomEvent::TrackUnmuted {
394            publication: publication_from_livekit(publication),
395            participant: participant_from_livekit(participant),
396        },
397        livekit::RoomEvent::RoomMetadataChanged {
398            old_metadata,
399            metadata,
400        } => RoomEvent::RoomMetadataChanged {
401            old_metadata,
402            metadata,
403        },
404        livekit::RoomEvent::ParticipantMetadataChanged {
405            participant,
406            old_metadata,
407            metadata,
408        } => RoomEvent::ParticipantMetadataChanged {
409            participant: participant_from_livekit(participant),
410            old_metadata,
411            metadata,
412        },
413        livekit::RoomEvent::ParticipantNameChanged {
414            participant,
415            old_name,
416            name,
417        } => RoomEvent::ParticipantNameChanged {
418            participant: participant_from_livekit(participant),
419            old_name,
420            name,
421        },
422        livekit::RoomEvent::ParticipantAttributesChanged {
423            participant,
424            changed_attributes,
425        } => RoomEvent::ParticipantAttributesChanged {
426            participant: participant_from_livekit(participant),
427            changed_attributes: changed_attributes.into_iter().collect(),
428        },
429        livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
430            RoomEvent::ActiveSpeakersChanged {
431                speakers: speakers.into_iter().map(participant_from_livekit).collect(),
432            }
433        }
434        livekit::RoomEvent::Connected {
435            participants_with_tracks,
436        } => RoomEvent::Connected {
437            participants_with_tracks: participants_with_tracks
438                .into_iter()
439                .map({
440                    |(p, t)| {
441                        (
442                            RemoteParticipant(p),
443                            t.into_iter().map(RemoteTrackPublication).collect(),
444                        )
445                    }
446                })
447                .collect(),
448        },
449        livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
450            reason: reason.as_str_name(),
451        },
452        livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
453        livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
454        _ => {
455            log::trace!("dropping livekit event: {:?}", event);
456            return None;
457        }
458    };
459
460    Some(event)
461}