livekit_client.rs

  1use std::sync::Arc;
  2
  3use anyhow::{Context as _, Result};
  4use audio::AudioSettings;
  5use collections::HashMap;
  6use futures::{SinkExt, channel::mpsc};
  7use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
  8use gpui_tokio::Tokio;
  9use log::info;
 10use playback::capture_local_video_track;
 11use settings::Settings;
 12
 13mod playback;
 14
 15use crate::{LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication};
 16pub use playback::AudioStream;
 17pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
 18
 19#[derive(Clone, Debug)]
 20pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
 21#[derive(Clone, Debug)]
 22pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
 23#[derive(Clone, Debug)]
 24pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
 25#[derive(Clone, Debug)]
 26pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
 27
 28#[derive(Clone, Debug)]
 29pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
 30#[derive(Clone, Debug)]
 31pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
 32#[derive(Clone, Debug)]
 33pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
 34#[derive(Clone, Debug)]
 35pub struct LocalParticipant(livekit::participant::LocalParticipant);
 36
 37pub struct Room {
 38    room: livekit::Room,
 39    _task: Task<()>,
 40    playback: playback::AudioStack,
 41}
 42
 43pub type TrackSid = livekit::id::TrackSid;
 44pub type ConnectionState = livekit::ConnectionState;
 45#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
 46pub struct ParticipantIdentity(pub String);
 47
 48impl Room {
 49    pub async fn connect(
 50        url: String,
 51        token: String,
 52        cx: &mut AsyncApp,
 53    ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
 54        let connector =
 55            tokio_tungstenite::Connector::Rustls(Arc::new(http_client_tls::tls_config()));
 56        let mut config = livekit::RoomOptions::default();
 57        config.connector = Some(connector);
 58        let (room, mut events) = Tokio::spawn(cx, async move {
 59            livekit::Room::connect(&url, &token, config).await
 60        })?
 61        .await??;
 62
 63        let (mut tx, rx) = mpsc::unbounded();
 64        let task = cx.background_executor().spawn(async move {
 65            while let Some(event) = events.recv().await {
 66                if let Some(event) = room_event_from_livekit(event) {
 67                    tx.send(event).await.ok();
 68                }
 69            }
 70        });
 71
 72        Ok((
 73            Self {
 74                room,
 75                _task: task,
 76                playback: playback::AudioStack::new(cx.background_executor().clone()),
 77            },
 78            rx,
 79        ))
 80    }
 81
 82    pub fn local_participant(&self) -> LocalParticipant {
 83        LocalParticipant(self.room.local_participant())
 84    }
 85
 86    pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
 87        self.room
 88            .remote_participants()
 89            .into_iter()
 90            .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
 91            .collect()
 92    }
 93
 94    pub fn connection_state(&self) -> ConnectionState {
 95        self.room.connection_state()
 96    }
 97
 98    pub async fn publish_local_microphone_track(
 99        &self,
100        user_name: String,
101        is_staff: bool,
102        cx: &mut AsyncApp,
103    ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
104        let (track, stream) = self
105            .playback
106            .capture_local_microphone_track(user_name, is_staff, &cx)?;
107        let publication = self
108            .local_participant()
109            .publish_track(
110                livekit::track::LocalTrack::Audio(track.0),
111                livekit::options::TrackPublishOptions {
112                    source: livekit::track::TrackSource::Microphone,
113                    ..Default::default()
114                },
115                cx,
116            )
117            .await?;
118
119        Ok((publication, stream))
120    }
121
122    pub async fn unpublish_local_track(
123        &self,
124        sid: TrackSid,
125        cx: &mut AsyncApp,
126    ) -> Result<LocalTrackPublication> {
127        self.local_participant().unpublish_track(sid, cx).await
128    }
129
130    pub fn play_remote_audio_track(
131        &self,
132        track: &RemoteAudioTrack,
133        cx: &mut App,
134    ) -> Result<playback::AudioStream> {
135        if AudioSettings::get_global(cx).rodio_audio {
136            info!("Using experimental.rodio_audio audio pipeline for output");
137            playback::play_remote_audio_track(&track.0, cx)
138        } else {
139            Ok(self.playback.play_remote_audio_track(&track.0))
140        }
141    }
142}
143
144impl LocalParticipant {
145    pub async fn publish_screenshare_track(
146        &self,
147        source: &dyn ScreenCaptureSource,
148        cx: &mut AsyncApp,
149    ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
150        let (track, stream) = capture_local_video_track(source, cx).await?;
151        let options = livekit::options::TrackPublishOptions {
152            source: livekit::track::TrackSource::Screenshare,
153            video_codec: livekit::options::VideoCodec::VP8,
154            ..Default::default()
155        };
156        let publication = self
157            .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
158            .await?;
159
160        Ok((publication, stream))
161    }
162
163    async fn publish_track(
164        &self,
165        track: livekit::track::LocalTrack,
166        options: livekit::options::TrackPublishOptions,
167        cx: &mut AsyncApp,
168    ) -> Result<LocalTrackPublication> {
169        let participant = self.0.clone();
170        Tokio::spawn(cx, async move {
171            participant.publish_track(track, options).await
172        })?
173        .await?
174        .map(LocalTrackPublication)
175        .context("publishing a track")
176    }
177
178    pub async fn unpublish_track(
179        &self,
180        sid: TrackSid,
181        cx: &mut AsyncApp,
182    ) -> Result<LocalTrackPublication> {
183        let participant = self.0.clone();
184        Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })?
185            .await?
186            .map(LocalTrackPublication)
187            .context("unpublishing a track")
188    }
189}
190
191impl LocalTrackPublication {
192    pub fn mute(&self, cx: &App) {
193        let track = self.0.clone();
194        Tokio::spawn(cx, async move {
195            track.mute();
196        })
197        .detach();
198    }
199
200    pub fn unmute(&self, cx: &App) {
201        let track = self.0.clone();
202        Tokio::spawn(cx, async move {
203            track.unmute();
204        })
205        .detach();
206    }
207
208    pub fn sid(&self) -> TrackSid {
209        self.0.sid()
210    }
211
212    pub fn is_muted(&self) -> bool {
213        self.0.is_muted()
214    }
215}
216
217impl RemoteParticipant {
218    pub fn identity(&self) -> ParticipantIdentity {
219        ParticipantIdentity(self.0.identity().0)
220    }
221
222    pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
223        self.0
224            .track_publications()
225            .into_iter()
226            .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
227            .collect()
228    }
229}
230
231impl RemoteAudioTrack {
232    pub fn sid(&self) -> TrackSid {
233        self.0.sid()
234    }
235}
236
237impl RemoteVideoTrack {
238    pub fn sid(&self) -> TrackSid {
239        self.0.sid()
240    }
241}
242
243impl RemoteTrackPublication {
244    pub fn is_muted(&self) -> bool {
245        self.0.is_muted()
246    }
247
248    pub fn is_enabled(&self) -> bool {
249        self.0.is_enabled()
250    }
251
252    pub fn track(&self) -> Option<RemoteTrack> {
253        self.0.track().map(remote_track_from_livekit)
254    }
255
256    pub fn is_audio(&self) -> bool {
257        self.0.kind() == livekit::track::TrackKind::Audio
258    }
259
260    pub fn set_enabled(&self, enabled: bool, cx: &App) {
261        let track = self.0.clone();
262        Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
263    }
264
265    pub fn sid(&self) -> TrackSid {
266        self.0.sid()
267    }
268}
269
270impl Participant {
271    pub fn identity(&self) -> ParticipantIdentity {
272        match self {
273            Participant::Local(local_participant) => {
274                ParticipantIdentity(local_participant.0.identity().0)
275            }
276            Participant::Remote(remote_participant) => {
277                ParticipantIdentity(remote_participant.0.identity().0)
278            }
279        }
280    }
281}
282
283fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
284    match participant {
285        livekit::participant::Participant::Local(local) => {
286            Participant::Local(LocalParticipant(local))
287        }
288        livekit::participant::Participant::Remote(remote) => {
289            Participant::Remote(RemoteParticipant(remote))
290        }
291    }
292}
293
294fn publication_from_livekit(
295    publication: livekit::publication::TrackPublication,
296) -> TrackPublication {
297    match publication {
298        livekit::publication::TrackPublication::Local(local) => {
299            TrackPublication::Local(LocalTrackPublication(local))
300        }
301        livekit::publication::TrackPublication::Remote(remote) => {
302            TrackPublication::Remote(RemoteTrackPublication(remote))
303        }
304    }
305}
306
307fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
308    match track {
309        livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
310        livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
311    }
312}
313
314fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
315    match track {
316        livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
317        livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
318    }
319}
320fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
321    let event = match event {
322        livekit::RoomEvent::ParticipantConnected(remote_participant) => {
323            RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
324        }
325        livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
326            RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
327        }
328        livekit::RoomEvent::LocalTrackPublished {
329            publication,
330            track,
331            participant,
332        } => RoomEvent::LocalTrackPublished {
333            publication: LocalTrackPublication(publication),
334            track: local_track_from_livekit(track),
335            participant: LocalParticipant(participant),
336        },
337        livekit::RoomEvent::LocalTrackUnpublished {
338            publication,
339            participant,
340        } => RoomEvent::LocalTrackUnpublished {
341            publication: LocalTrackPublication(publication),
342            participant: LocalParticipant(participant),
343        },
344        livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
345            track: local_track_from_livekit(track),
346        },
347        livekit::RoomEvent::TrackSubscribed {
348            track,
349            publication,
350            participant,
351        } => RoomEvent::TrackSubscribed {
352            track: remote_track_from_livekit(track),
353            publication: RemoteTrackPublication(publication),
354            participant: RemoteParticipant(participant),
355        },
356        livekit::RoomEvent::TrackUnsubscribed {
357            track,
358            publication,
359            participant,
360        } => RoomEvent::TrackUnsubscribed {
361            track: remote_track_from_livekit(track),
362            publication: RemoteTrackPublication(publication),
363            participant: RemoteParticipant(participant),
364        },
365        livekit::RoomEvent::TrackSubscriptionFailed {
366            participant,
367            error: _,
368            track_sid,
369        } => RoomEvent::TrackSubscriptionFailed {
370            participant: RemoteParticipant(participant),
371            track_sid,
372        },
373        livekit::RoomEvent::TrackPublished {
374            publication,
375            participant,
376        } => RoomEvent::TrackPublished {
377            publication: RemoteTrackPublication(publication),
378            participant: RemoteParticipant(participant),
379        },
380        livekit::RoomEvent::TrackUnpublished {
381            publication,
382            participant,
383        } => RoomEvent::TrackUnpublished {
384            publication: RemoteTrackPublication(publication),
385            participant: RemoteParticipant(participant),
386        },
387        livekit::RoomEvent::TrackMuted {
388            participant,
389            publication,
390        } => RoomEvent::TrackMuted {
391            publication: publication_from_livekit(publication),
392            participant: participant_from_livekit(participant),
393        },
394        livekit::RoomEvent::TrackUnmuted {
395            participant,
396            publication,
397        } => RoomEvent::TrackUnmuted {
398            publication: publication_from_livekit(publication),
399            participant: participant_from_livekit(participant),
400        },
401        livekit::RoomEvent::RoomMetadataChanged {
402            old_metadata,
403            metadata,
404        } => RoomEvent::RoomMetadataChanged {
405            old_metadata,
406            metadata,
407        },
408        livekit::RoomEvent::ParticipantMetadataChanged {
409            participant,
410            old_metadata,
411            metadata,
412        } => RoomEvent::ParticipantMetadataChanged {
413            participant: participant_from_livekit(participant),
414            old_metadata,
415            metadata,
416        },
417        livekit::RoomEvent::ParticipantNameChanged {
418            participant,
419            old_name,
420            name,
421        } => RoomEvent::ParticipantNameChanged {
422            participant: participant_from_livekit(participant),
423            old_name,
424            name,
425        },
426        livekit::RoomEvent::ParticipantAttributesChanged {
427            participant,
428            changed_attributes,
429        } => RoomEvent::ParticipantAttributesChanged {
430            participant: participant_from_livekit(participant),
431            changed_attributes: changed_attributes.into_iter().collect(),
432        },
433        livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
434            RoomEvent::ActiveSpeakersChanged {
435                speakers: speakers.into_iter().map(participant_from_livekit).collect(),
436            }
437        }
438        livekit::RoomEvent::Connected {
439            participants_with_tracks,
440        } => RoomEvent::Connected {
441            participants_with_tracks: participants_with_tracks
442                .into_iter()
443                .map({
444                    |(p, t)| {
445                        (
446                            RemoteParticipant(p),
447                            t.into_iter().map(RemoteTrackPublication).collect(),
448                        )
449                    }
450                })
451                .collect(),
452        },
453        livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
454            reason: reason.as_str_name(),
455        },
456        livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
457        livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
458        _ => {
459            log::trace!("dropping livekit event: {:?}", event);
460            return None;
461        }
462    };
463
464    Some(event)
465}