livekit_client.rs

  1use std::sync::Arc;
  2
  3use anyhow::{Context as _, Result};
  4use collections::HashMap;
  5use futures::{SinkExt, channel::mpsc};
  6use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
  7use gpui_tokio::Tokio;
  8use playback::capture_local_video_track;
  9
 10mod playback;
 11#[cfg(feature = "record-microphone")]
 12mod record;
 13
 14use crate::{LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication};
 15pub use playback::AudioStream;
 16pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
 17
 18#[derive(Clone, Debug)]
 19pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
 20#[derive(Clone, Debug)]
 21pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
 22#[derive(Clone, Debug)]
 23pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
 24#[derive(Clone, Debug)]
 25pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
 26
 27#[derive(Clone, Debug)]
 28pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
 29#[derive(Clone, Debug)]
 30pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
 31#[derive(Clone, Debug)]
 32pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
 33#[derive(Clone, Debug)]
 34pub struct LocalParticipant(livekit::participant::LocalParticipant);
 35
 36pub struct Room {
 37    room: livekit::Room,
 38    _task: Task<()>,
 39    playback: playback::AudioStack,
 40}
 41
 42pub type TrackSid = livekit::id::TrackSid;
 43pub type ConnectionState = livekit::ConnectionState;
 44#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
 45pub struct ParticipantIdentity(pub String);
 46
 47impl Room {
 48    pub async fn connect(
 49        url: String,
 50        token: String,
 51        cx: &mut AsyncApp,
 52    ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
 53        let connector =
 54            tokio_tungstenite::Connector::Rustls(Arc::new(http_client_tls::tls_config()));
 55        let mut config = livekit::RoomOptions::default();
 56        config.connector = Some(connector);
 57        let (room, mut events) = Tokio::spawn(cx, async move {
 58            livekit::Room::connect(&url, &token, config).await
 59        })?
 60        .await??;
 61
 62        let (mut tx, rx) = mpsc::unbounded();
 63        let task = cx.background_executor().spawn(async move {
 64            while let Some(event) = events.recv().await {
 65                if let Some(event) = room_event_from_livekit(event) {
 66                    tx.send(event).await.ok();
 67                }
 68            }
 69        });
 70
 71        Ok((
 72            Self {
 73                room,
 74                _task: task,
 75                playback: playback::AudioStack::new(cx.background_executor().clone()),
 76            },
 77            rx,
 78        ))
 79    }
 80
 81    pub fn local_participant(&self) -> LocalParticipant {
 82        LocalParticipant(self.room.local_participant())
 83    }
 84
 85    pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
 86        self.room
 87            .remote_participants()
 88            .into_iter()
 89            .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
 90            .collect()
 91    }
 92
 93    pub fn connection_state(&self) -> ConnectionState {
 94        self.room.connection_state()
 95    }
 96
 97    pub async fn publish_local_microphone_track(
 98        &self,
 99        cx: &mut AsyncApp,
100    ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
101        let (track, stream) = self.playback.capture_local_microphone_track()?;
102        let publication = self
103            .local_participant()
104            .publish_track(
105                livekit::track::LocalTrack::Audio(track.0),
106                livekit::options::TrackPublishOptions {
107                    source: livekit::track::TrackSource::Microphone,
108                    ..Default::default()
109                },
110                cx,
111            )
112            .await?;
113
114        Ok((publication, stream))
115    }
116
117    pub async fn unpublish_local_track(
118        &self,
119        sid: TrackSid,
120        cx: &mut AsyncApp,
121    ) -> Result<LocalTrackPublication> {
122        self.local_participant().unpublish_track(sid, cx).await
123    }
124
125    pub fn play_remote_audio_track(
126        &self,
127        track: &RemoteAudioTrack,
128        _cx: &App,
129    ) -> Result<playback::AudioStream> {
130        Ok(self.playback.play_remote_audio_track(&track.0))
131    }
132}
133
134impl LocalParticipant {
135    pub async fn publish_screenshare_track(
136        &self,
137        source: &dyn ScreenCaptureSource,
138        cx: &mut AsyncApp,
139    ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
140        let (track, stream) = capture_local_video_track(source, cx).await?;
141        let options = livekit::options::TrackPublishOptions {
142            source: livekit::track::TrackSource::Screenshare,
143            video_codec: livekit::options::VideoCodec::VP8,
144            ..Default::default()
145        };
146        let publication = self
147            .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
148            .await?;
149
150        Ok((publication, stream))
151    }
152
153    async fn publish_track(
154        &self,
155        track: livekit::track::LocalTrack,
156        options: livekit::options::TrackPublishOptions,
157        cx: &mut AsyncApp,
158    ) -> Result<LocalTrackPublication> {
159        let participant = self.0.clone();
160        Tokio::spawn(cx, async move {
161            participant.publish_track(track, options).await
162        })?
163        .await?
164        .map(LocalTrackPublication)
165        .context("publishing a track")
166    }
167
168    pub async fn unpublish_track(
169        &self,
170        sid: TrackSid,
171        cx: &mut AsyncApp,
172    ) -> Result<LocalTrackPublication> {
173        let participant = self.0.clone();
174        Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })?
175            .await?
176            .map(LocalTrackPublication)
177            .context("unpublishing a track")
178    }
179}
180
181impl LocalTrackPublication {
182    pub fn mute(&self, cx: &App) {
183        let track = self.0.clone();
184        Tokio::spawn(cx, async move {
185            track.mute();
186        })
187        .detach();
188    }
189
190    pub fn unmute(&self, cx: &App) {
191        let track = self.0.clone();
192        Tokio::spawn(cx, async move {
193            track.unmute();
194        })
195        .detach();
196    }
197
198    pub fn sid(&self) -> TrackSid {
199        self.0.sid()
200    }
201
202    pub fn is_muted(&self) -> bool {
203        self.0.is_muted()
204    }
205}
206
207impl RemoteParticipant {
208    pub fn identity(&self) -> ParticipantIdentity {
209        ParticipantIdentity(self.0.identity().0)
210    }
211
212    pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
213        self.0
214            .track_publications()
215            .into_iter()
216            .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
217            .collect()
218    }
219}
220
221impl RemoteAudioTrack {
222    pub fn sid(&self) -> TrackSid {
223        self.0.sid()
224    }
225}
226
227impl RemoteVideoTrack {
228    pub fn sid(&self) -> TrackSid {
229        self.0.sid()
230    }
231}
232
233impl RemoteTrackPublication {
234    pub fn is_muted(&self) -> bool {
235        self.0.is_muted()
236    }
237
238    pub fn is_enabled(&self) -> bool {
239        self.0.is_enabled()
240    }
241
242    pub fn track(&self) -> Option<RemoteTrack> {
243        self.0.track().map(remote_track_from_livekit)
244    }
245
246    pub fn is_audio(&self) -> bool {
247        self.0.kind() == livekit::track::TrackKind::Audio
248    }
249
250    pub fn set_enabled(&self, enabled: bool, cx: &App) {
251        let track = self.0.clone();
252        Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
253    }
254
255    pub fn sid(&self) -> TrackSid {
256        self.0.sid()
257    }
258}
259
260impl Participant {
261    pub fn identity(&self) -> ParticipantIdentity {
262        match self {
263            Participant::Local(local_participant) => {
264                ParticipantIdentity(local_participant.0.identity().0)
265            }
266            Participant::Remote(remote_participant) => {
267                ParticipantIdentity(remote_participant.0.identity().0)
268            }
269        }
270    }
271}
272
273fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
274    match participant {
275        livekit::participant::Participant::Local(local) => {
276            Participant::Local(LocalParticipant(local))
277        }
278        livekit::participant::Participant::Remote(remote) => {
279            Participant::Remote(RemoteParticipant(remote))
280        }
281    }
282}
283
284fn publication_from_livekit(
285    publication: livekit::publication::TrackPublication,
286) -> TrackPublication {
287    match publication {
288        livekit::publication::TrackPublication::Local(local) => {
289            TrackPublication::Local(LocalTrackPublication(local))
290        }
291        livekit::publication::TrackPublication::Remote(remote) => {
292            TrackPublication::Remote(RemoteTrackPublication(remote))
293        }
294    }
295}
296
297fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
298    match track {
299        livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
300        livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
301    }
302}
303
304fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
305    match track {
306        livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
307        livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
308    }
309}
310fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
311    let event = match event {
312        livekit::RoomEvent::ParticipantConnected(remote_participant) => {
313            RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
314        }
315        livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
316            RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
317        }
318        livekit::RoomEvent::LocalTrackPublished {
319            publication,
320            track,
321            participant,
322        } => RoomEvent::LocalTrackPublished {
323            publication: LocalTrackPublication(publication),
324            track: local_track_from_livekit(track),
325            participant: LocalParticipant(participant),
326        },
327        livekit::RoomEvent::LocalTrackUnpublished {
328            publication,
329            participant,
330        } => RoomEvent::LocalTrackUnpublished {
331            publication: LocalTrackPublication(publication),
332            participant: LocalParticipant(participant),
333        },
334        livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
335            track: local_track_from_livekit(track),
336        },
337        livekit::RoomEvent::TrackSubscribed {
338            track,
339            publication,
340            participant,
341        } => RoomEvent::TrackSubscribed {
342            track: remote_track_from_livekit(track),
343            publication: RemoteTrackPublication(publication),
344            participant: RemoteParticipant(participant),
345        },
346        livekit::RoomEvent::TrackUnsubscribed {
347            track,
348            publication,
349            participant,
350        } => RoomEvent::TrackUnsubscribed {
351            track: remote_track_from_livekit(track),
352            publication: RemoteTrackPublication(publication),
353            participant: RemoteParticipant(participant),
354        },
355        livekit::RoomEvent::TrackSubscriptionFailed {
356            participant,
357            error: _,
358            track_sid,
359        } => RoomEvent::TrackSubscriptionFailed {
360            participant: RemoteParticipant(participant),
361            track_sid,
362        },
363        livekit::RoomEvent::TrackPublished {
364            publication,
365            participant,
366        } => RoomEvent::TrackPublished {
367            publication: RemoteTrackPublication(publication),
368            participant: RemoteParticipant(participant),
369        },
370        livekit::RoomEvent::TrackUnpublished {
371            publication,
372            participant,
373        } => RoomEvent::TrackUnpublished {
374            publication: RemoteTrackPublication(publication),
375            participant: RemoteParticipant(participant),
376        },
377        livekit::RoomEvent::TrackMuted {
378            participant,
379            publication,
380        } => RoomEvent::TrackMuted {
381            publication: publication_from_livekit(publication),
382            participant: participant_from_livekit(participant),
383        },
384        livekit::RoomEvent::TrackUnmuted {
385            participant,
386            publication,
387        } => RoomEvent::TrackUnmuted {
388            publication: publication_from_livekit(publication),
389            participant: participant_from_livekit(participant),
390        },
391        livekit::RoomEvent::RoomMetadataChanged {
392            old_metadata,
393            metadata,
394        } => RoomEvent::RoomMetadataChanged {
395            old_metadata,
396            metadata,
397        },
398        livekit::RoomEvent::ParticipantMetadataChanged {
399            participant,
400            old_metadata,
401            metadata,
402        } => RoomEvent::ParticipantMetadataChanged {
403            participant: participant_from_livekit(participant),
404            old_metadata,
405            metadata,
406        },
407        livekit::RoomEvent::ParticipantNameChanged {
408            participant,
409            old_name,
410            name,
411        } => RoomEvent::ParticipantNameChanged {
412            participant: participant_from_livekit(participant),
413            old_name,
414            name,
415        },
416        livekit::RoomEvent::ParticipantAttributesChanged {
417            participant,
418            changed_attributes,
419        } => RoomEvent::ParticipantAttributesChanged {
420            participant: participant_from_livekit(participant),
421            changed_attributes: changed_attributes.into_iter().collect(),
422        },
423        livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
424            RoomEvent::ActiveSpeakersChanged {
425                speakers: speakers.into_iter().map(participant_from_livekit).collect(),
426            }
427        }
428        livekit::RoomEvent::Connected {
429            participants_with_tracks,
430        } => RoomEvent::Connected {
431            participants_with_tracks: participants_with_tracks
432                .into_iter()
433                .map({
434                    |(p, t)| {
435                        (
436                            RemoteParticipant(p),
437                            t.into_iter().map(RemoteTrackPublication).collect(),
438                        )
439                    }
440                })
441                .collect(),
442        },
443        livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
444            reason: reason.as_str_name(),
445        },
446        livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
447        livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
448        _ => {
449            log::trace!("dropping livekit event: {:?}", event);
450            return None;
451        }
452    };
453
454    Some(event)
455}