livekit_client.rs

  1use std::sync::Arc;
  2
  3use anyhow::{Context as _, Result};
  4use collections::HashMap;
  5use futures::{SinkExt, channel::mpsc};
  6use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
  7use gpui_tokio::Tokio;
  8use playback::capture_local_video_track;
  9
 10mod playback;
 11
 12use crate::{LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication};
 13pub use playback::AudioStream;
 14pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
 15
 16#[derive(Clone, Debug)]
 17pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
 18#[derive(Clone, Debug)]
 19pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
 20#[derive(Clone, Debug)]
 21pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
 22#[derive(Clone, Debug)]
 23pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
 24
 25#[derive(Clone, Debug)]
 26pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
 27#[derive(Clone, Debug)]
 28pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
 29#[derive(Clone, Debug)]
 30pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
 31#[derive(Clone, Debug)]
 32pub struct LocalParticipant(livekit::participant::LocalParticipant);
 33
 34pub struct Room {
 35    room: livekit::Room,
 36    _task: Task<()>,
 37    playback: playback::AudioStack,
 38}
 39
 40pub type TrackSid = livekit::id::TrackSid;
 41pub type ConnectionState = livekit::ConnectionState;
 42#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
 43pub struct ParticipantIdentity(pub String);
 44
 45impl Room {
 46    pub async fn connect(
 47        url: String,
 48        token: String,
 49        cx: &mut AsyncApp,
 50    ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
 51        let connector =
 52            tokio_tungstenite::Connector::Rustls(Arc::new(http_client_tls::tls_config()));
 53        let mut config = livekit::RoomOptions::default();
 54        config.connector = Some(connector);
 55        let (room, mut events) = Tokio::spawn(cx, async move {
 56            livekit::Room::connect(&url, &token, config).await
 57        })?
 58        .await??;
 59
 60        let (mut tx, rx) = mpsc::unbounded();
 61        let task = cx.background_executor().spawn(async move {
 62            while let Some(event) = events.recv().await {
 63                if let Some(event) = room_event_from_livekit(event) {
 64                    tx.send(event).await.ok();
 65                }
 66            }
 67        });
 68
 69        Ok((
 70            Self {
 71                room,
 72                _task: task,
 73                playback: playback::AudioStack::new(cx.background_executor().clone()),
 74            },
 75            rx,
 76        ))
 77    }
 78
 79    pub fn local_participant(&self) -> LocalParticipant {
 80        LocalParticipant(self.room.local_participant())
 81    }
 82
 83    pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
 84        self.room
 85            .remote_participants()
 86            .into_iter()
 87            .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
 88            .collect()
 89    }
 90
 91    pub fn connection_state(&self) -> ConnectionState {
 92        self.room.connection_state()
 93    }
 94
 95    pub async fn publish_local_microphone_track(
 96        &self,
 97        cx: &mut AsyncApp,
 98    ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
 99        let (track, stream) = self.playback.capture_local_microphone_track()?;
100        let publication = self
101            .local_participant()
102            .publish_track(
103                livekit::track::LocalTrack::Audio(track.0),
104                livekit::options::TrackPublishOptions {
105                    source: livekit::track::TrackSource::Microphone,
106                    ..Default::default()
107                },
108                cx,
109            )
110            .await?;
111
112        Ok((publication, stream))
113    }
114
115    pub async fn unpublish_local_track(
116        &self,
117        sid: TrackSid,
118        cx: &mut AsyncApp,
119    ) -> Result<LocalTrackPublication> {
120        self.local_participant().unpublish_track(sid, cx).await
121    }
122
123    pub fn play_remote_audio_track(
124        &self,
125        track: &RemoteAudioTrack,
126        _cx: &App,
127    ) -> Result<playback::AudioStream> {
128        Ok(self.playback.play_remote_audio_track(&track.0))
129    }
130}
131
132impl LocalParticipant {
133    pub async fn publish_screenshare_track(
134        &self,
135        source: &dyn ScreenCaptureSource,
136        cx: &mut AsyncApp,
137    ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
138        let (track, stream) = capture_local_video_track(source, cx).await?;
139        let options = livekit::options::TrackPublishOptions {
140            source: livekit::track::TrackSource::Screenshare,
141            video_codec: livekit::options::VideoCodec::VP8,
142            ..Default::default()
143        };
144        let publication = self
145            .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
146            .await?;
147
148        Ok((publication, stream))
149    }
150
151    async fn publish_track(
152        &self,
153        track: livekit::track::LocalTrack,
154        options: livekit::options::TrackPublishOptions,
155        cx: &mut AsyncApp,
156    ) -> Result<LocalTrackPublication> {
157        let participant = self.0.clone();
158        Tokio::spawn(cx, async move {
159            participant.publish_track(track, options).await
160        })?
161        .await?
162        .map(LocalTrackPublication)
163        .context("publishing a track")
164    }
165
166    pub async fn unpublish_track(
167        &self,
168        sid: TrackSid,
169        cx: &mut AsyncApp,
170    ) -> Result<LocalTrackPublication> {
171        let participant = self.0.clone();
172        Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })?
173            .await?
174            .map(LocalTrackPublication)
175            .context("unpublishing a track")
176    }
177}
178
179impl LocalTrackPublication {
180    pub fn mute(&self, cx: &App) {
181        let track = self.0.clone();
182        Tokio::spawn(cx, async move {
183            track.mute();
184        })
185        .detach();
186    }
187
188    pub fn unmute(&self, cx: &App) {
189        let track = self.0.clone();
190        Tokio::spawn(cx, async move {
191            track.unmute();
192        })
193        .detach();
194    }
195
196    pub fn sid(&self) -> TrackSid {
197        self.0.sid()
198    }
199
200    pub fn is_muted(&self) -> bool {
201        self.0.is_muted()
202    }
203}
204
205impl RemoteParticipant {
206    pub fn identity(&self) -> ParticipantIdentity {
207        ParticipantIdentity(self.0.identity().0)
208    }
209
210    pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
211        self.0
212            .track_publications()
213            .into_iter()
214            .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
215            .collect()
216    }
217}
218
219impl RemoteAudioTrack {
220    pub fn sid(&self) -> TrackSid {
221        self.0.sid()
222    }
223}
224
225impl RemoteVideoTrack {
226    pub fn sid(&self) -> TrackSid {
227        self.0.sid()
228    }
229}
230
231impl RemoteTrackPublication {
232    pub fn is_muted(&self) -> bool {
233        self.0.is_muted()
234    }
235
236    pub fn is_enabled(&self) -> bool {
237        self.0.is_enabled()
238    }
239
240    pub fn track(&self) -> Option<RemoteTrack> {
241        self.0.track().map(remote_track_from_livekit)
242    }
243
244    pub fn is_audio(&self) -> bool {
245        self.0.kind() == livekit::track::TrackKind::Audio
246    }
247
248    pub fn set_enabled(&self, enabled: bool, cx: &App) {
249        let track = self.0.clone();
250        Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
251    }
252
253    pub fn sid(&self) -> TrackSid {
254        self.0.sid()
255    }
256}
257
258impl Participant {
259    pub fn identity(&self) -> ParticipantIdentity {
260        match self {
261            Participant::Local(local_participant) => {
262                ParticipantIdentity(local_participant.0.identity().0)
263            }
264            Participant::Remote(remote_participant) => {
265                ParticipantIdentity(remote_participant.0.identity().0)
266            }
267        }
268    }
269}
270
271fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
272    match participant {
273        livekit::participant::Participant::Local(local) => {
274            Participant::Local(LocalParticipant(local))
275        }
276        livekit::participant::Participant::Remote(remote) => {
277            Participant::Remote(RemoteParticipant(remote))
278        }
279    }
280}
281
282fn publication_from_livekit(
283    publication: livekit::publication::TrackPublication,
284) -> TrackPublication {
285    match publication {
286        livekit::publication::TrackPublication::Local(local) => {
287            TrackPublication::Local(LocalTrackPublication(local))
288        }
289        livekit::publication::TrackPublication::Remote(remote) => {
290            TrackPublication::Remote(RemoteTrackPublication(remote))
291        }
292    }
293}
294
295fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
296    match track {
297        livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
298        livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
299    }
300}
301
302fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
303    match track {
304        livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
305        livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
306    }
307}
308fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
309    let event = match event {
310        livekit::RoomEvent::ParticipantConnected(remote_participant) => {
311            RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
312        }
313        livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
314            RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
315        }
316        livekit::RoomEvent::LocalTrackPublished {
317            publication,
318            track,
319            participant,
320        } => RoomEvent::LocalTrackPublished {
321            publication: LocalTrackPublication(publication),
322            track: local_track_from_livekit(track),
323            participant: LocalParticipant(participant),
324        },
325        livekit::RoomEvent::LocalTrackUnpublished {
326            publication,
327            participant,
328        } => RoomEvent::LocalTrackUnpublished {
329            publication: LocalTrackPublication(publication),
330            participant: LocalParticipant(participant),
331        },
332        livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
333            track: local_track_from_livekit(track),
334        },
335        livekit::RoomEvent::TrackSubscribed {
336            track,
337            publication,
338            participant,
339        } => RoomEvent::TrackSubscribed {
340            track: remote_track_from_livekit(track),
341            publication: RemoteTrackPublication(publication),
342            participant: RemoteParticipant(participant),
343        },
344        livekit::RoomEvent::TrackUnsubscribed {
345            track,
346            publication,
347            participant,
348        } => RoomEvent::TrackUnsubscribed {
349            track: remote_track_from_livekit(track),
350            publication: RemoteTrackPublication(publication),
351            participant: RemoteParticipant(participant),
352        },
353        livekit::RoomEvent::TrackSubscriptionFailed {
354            participant,
355            error: _,
356            track_sid,
357        } => RoomEvent::TrackSubscriptionFailed {
358            participant: RemoteParticipant(participant),
359            track_sid,
360        },
361        livekit::RoomEvent::TrackPublished {
362            publication,
363            participant,
364        } => RoomEvent::TrackPublished {
365            publication: RemoteTrackPublication(publication),
366            participant: RemoteParticipant(participant),
367        },
368        livekit::RoomEvent::TrackUnpublished {
369            publication,
370            participant,
371        } => RoomEvent::TrackUnpublished {
372            publication: RemoteTrackPublication(publication),
373            participant: RemoteParticipant(participant),
374        },
375        livekit::RoomEvent::TrackMuted {
376            participant,
377            publication,
378        } => RoomEvent::TrackMuted {
379            publication: publication_from_livekit(publication),
380            participant: participant_from_livekit(participant),
381        },
382        livekit::RoomEvent::TrackUnmuted {
383            participant,
384            publication,
385        } => RoomEvent::TrackUnmuted {
386            publication: publication_from_livekit(publication),
387            participant: participant_from_livekit(participant),
388        },
389        livekit::RoomEvent::RoomMetadataChanged {
390            old_metadata,
391            metadata,
392        } => RoomEvent::RoomMetadataChanged {
393            old_metadata,
394            metadata,
395        },
396        livekit::RoomEvent::ParticipantMetadataChanged {
397            participant,
398            old_metadata,
399            metadata,
400        } => RoomEvent::ParticipantMetadataChanged {
401            participant: participant_from_livekit(participant),
402            old_metadata,
403            metadata,
404        },
405        livekit::RoomEvent::ParticipantNameChanged {
406            participant,
407            old_name,
408            name,
409        } => RoomEvent::ParticipantNameChanged {
410            participant: participant_from_livekit(participant),
411            old_name,
412            name,
413        },
414        livekit::RoomEvent::ParticipantAttributesChanged {
415            participant,
416            changed_attributes,
417        } => RoomEvent::ParticipantAttributesChanged {
418            participant: participant_from_livekit(participant),
419            changed_attributes: changed_attributes.into_iter().collect(),
420        },
421        livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
422            RoomEvent::ActiveSpeakersChanged {
423                speakers: speakers.into_iter().map(participant_from_livekit).collect(),
424            }
425        }
426        livekit::RoomEvent::Connected {
427            participants_with_tracks,
428        } => RoomEvent::Connected {
429            participants_with_tracks: participants_with_tracks
430                .into_iter()
431                .map({
432                    |(p, t)| {
433                        (
434                            RemoteParticipant(p),
435                            t.into_iter().map(RemoteTrackPublication).collect(),
436                        )
437                    }
438                })
439                .collect(),
440        },
441        livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
442            reason: reason.as_str_name(),
443        },
444        livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
445        livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
446        _ => {
447            log::trace!("dropping livekit event: {:?}", event);
448            return None;
449        }
450    };
451
452    Some(event)
453}