livekit_client.rs

  1use std::sync::Arc;
  2
  3use anyhow::Result;
  4use collections::HashMap;
  5use futures::{SinkExt, channel::mpsc};
  6use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
  7use gpui_tokio::Tokio;
  8use playback::capture_local_video_track;
  9
 10mod playback;
 11
 12use crate::{LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication};
 13pub use playback::AudioStream;
 14pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
 15
 16#[derive(Clone, Debug)]
 17pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
 18#[derive(Clone, Debug)]
 19pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
 20#[derive(Clone, Debug)]
 21pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
 22#[derive(Clone, Debug)]
 23pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
 24
 25#[derive(Clone, Debug)]
 26pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
 27#[derive(Clone, Debug)]
 28pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
 29#[derive(Clone, Debug)]
 30pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
 31#[derive(Clone, Debug)]
 32pub struct LocalParticipant(livekit::participant::LocalParticipant);
 33
 34pub struct Room {
 35    room: livekit::Room,
 36    _task: Task<()>,
 37    playback: playback::AudioStack,
 38}
 39
 40pub type TrackSid = livekit::id::TrackSid;
 41pub type ConnectionState = livekit::ConnectionState;
 42#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
 43pub struct ParticipantIdentity(pub String);
 44
 45impl Room {
 46    pub async fn connect(
 47        url: String,
 48        token: String,
 49        cx: &mut AsyncApp,
 50    ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
 51        let connector =
 52            tokio_tungstenite::Connector::Rustls(Arc::new(http_client_tls::tls_config()));
 53        let mut config = livekit::RoomOptions::default();
 54        config.connector = Some(connector);
 55        let (room, mut events) = Tokio::spawn(cx, async move {
 56            livekit::Room::connect(&url, &token, config).await
 57        })?
 58        .await??;
 59
 60        let (mut tx, rx) = mpsc::unbounded();
 61        let task = cx.background_executor().spawn(async move {
 62            while let Some(event) = events.recv().await {
 63                if let Some(event) = room_event_from_livekit(event) {
 64                    tx.send(event).await.ok();
 65                }
 66            }
 67        });
 68
 69        Ok((
 70            Self {
 71                room,
 72                _task: task,
 73                playback: playback::AudioStack::new(cx.background_executor().clone()),
 74            },
 75            rx,
 76        ))
 77    }
 78
 79    pub fn local_participant(&self) -> LocalParticipant {
 80        LocalParticipant(self.room.local_participant())
 81    }
 82
 83    pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
 84        self.room
 85            .remote_participants()
 86            .into_iter()
 87            .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
 88            .collect()
 89    }
 90
 91    pub fn connection_state(&self) -> ConnectionState {
 92        self.room.connection_state()
 93    }
 94
 95    pub async fn publish_local_microphone_track(
 96        &self,
 97        cx: &mut AsyncApp,
 98    ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
 99        let (track, stream) = self.playback.capture_local_microphone_track()?;
100        let publication = self
101            .local_participant()
102            .publish_track(
103                livekit::track::LocalTrack::Audio(track.0),
104                livekit::options::TrackPublishOptions {
105                    source: livekit::track::TrackSource::Microphone,
106                    ..Default::default()
107                },
108                cx,
109            )
110            .await?;
111
112        Ok((publication, stream))
113    }
114
115    pub async fn unpublish_local_track(
116        &self,
117        sid: TrackSid,
118        cx: &mut AsyncApp,
119    ) -> Result<LocalTrackPublication> {
120        self.local_participant().unpublish_track(sid, cx).await
121    }
122
123    pub fn play_remote_audio_track(
124        &self,
125        track: &RemoteAudioTrack,
126        _cx: &App,
127    ) -> Result<playback::AudioStream> {
128        Ok(self.playback.play_remote_audio_track(&track.0))
129    }
130}
131
132impl LocalParticipant {
133    pub async fn publish_screenshare_track(
134        &self,
135        source: &dyn ScreenCaptureSource,
136        cx: &mut AsyncApp,
137    ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
138        let (track, stream) = capture_local_video_track(source, cx).await?;
139        let options = livekit::options::TrackPublishOptions {
140            source: livekit::track::TrackSource::Screenshare,
141            video_codec: livekit::options::VideoCodec::VP8,
142            ..Default::default()
143        };
144        let publication = self
145            .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
146            .await?;
147
148        Ok((publication, stream))
149    }
150
151    async fn publish_track(
152        &self,
153        track: livekit::track::LocalTrack,
154        options: livekit::options::TrackPublishOptions,
155        cx: &mut AsyncApp,
156    ) -> Result<LocalTrackPublication> {
157        let participant = self.0.clone();
158        Tokio::spawn(cx, async move {
159            participant.publish_track(track, options).await
160        })?
161        .await?
162        .map(LocalTrackPublication)
163        .map_err(|error| anyhow::anyhow!("failed to publish track: {error}"))
164    }
165
166    pub async fn unpublish_track(
167        &self,
168        sid: TrackSid,
169        cx: &mut AsyncApp,
170    ) -> Result<LocalTrackPublication> {
171        let participant = self.0.clone();
172        Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })?
173            .await?
174            .map(LocalTrackPublication)
175            .map_err(|error| anyhow::anyhow!("failed to unpublish track: {error}"))
176    }
177}
178
179impl LocalTrackPublication {
180    pub fn mute(&self, cx: &App) {
181        let track = self.0.clone();
182        Tokio::spawn(cx, async move {
183            track.mute();
184        })
185        .detach();
186    }
187
188    pub fn unmute(&self, cx: &App) {
189        let track = self.0.clone();
190        Tokio::spawn(cx, async move {
191            track.unmute();
192        })
193        .detach();
194    }
195
196    pub fn sid(&self) -> TrackSid {
197        self.0.sid()
198    }
199
200    pub fn is_muted(&self) -> bool {
201        self.0.is_muted()
202    }
203}
204
205impl RemoteParticipant {
206    pub fn identity(&self) -> ParticipantIdentity {
207        ParticipantIdentity(self.0.identity().0)
208    }
209
210    pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
211        self.0
212            .track_publications()
213            .into_iter()
214            .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
215            .collect()
216    }
217}
218
219impl RemoteAudioTrack {
220    pub fn sid(&self) -> TrackSid {
221        self.0.sid()
222    }
223}
224
225impl RemoteVideoTrack {
226    pub fn sid(&self) -> TrackSid {
227        self.0.sid()
228    }
229}
230
231impl RemoteTrackPublication {
232    pub fn is_muted(&self) -> bool {
233        self.0.is_muted()
234    }
235
236    pub fn is_enabled(&self) -> bool {
237        self.0.is_enabled()
238    }
239
240    pub fn track(&self) -> Option<RemoteTrack> {
241        self.0.track().map(remote_track_from_livekit)
242    }
243
244    pub fn is_audio(&self) -> bool {
245        self.0.kind() == livekit::track::TrackKind::Audio
246    }
247
248    pub fn set_enabled(&self, enabled: bool, cx: &App) {
249        let track = self.0.clone();
250        Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
251    }
252
253    pub fn sid(&self) -> TrackSid {
254        self.0.sid()
255    }
256}
257
258impl RemoteTrack {
259    pub fn set_enabled(&self, enabled: bool, cx: &App) {
260        let this = self.clone();
261        Tokio::spawn(cx, async move {
262            match this {
263                RemoteTrack::Audio(remote_audio_track) => {
264                    remote_audio_track.0.rtc_track().set_enabled(enabled)
265                }
266                RemoteTrack::Video(remote_video_track) => {
267                    remote_video_track.0.rtc_track().set_enabled(enabled)
268                }
269            }
270        })
271        .detach();
272    }
273}
274
275impl Participant {
276    pub fn identity(&self) -> ParticipantIdentity {
277        match self {
278            Participant::Local(local_participant) => {
279                ParticipantIdentity(local_participant.0.identity().0)
280            }
281            Participant::Remote(remote_participant) => {
282                ParticipantIdentity(remote_participant.0.identity().0)
283            }
284        }
285    }
286}
287
288fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
289    match participant {
290        livekit::participant::Participant::Local(local) => {
291            Participant::Local(LocalParticipant(local))
292        }
293        livekit::participant::Participant::Remote(remote) => {
294            Participant::Remote(RemoteParticipant(remote))
295        }
296    }
297}
298
299fn publication_from_livekit(
300    publication: livekit::publication::TrackPublication,
301) -> TrackPublication {
302    match publication {
303        livekit::publication::TrackPublication::Local(local) => {
304            TrackPublication::Local(LocalTrackPublication(local))
305        }
306        livekit::publication::TrackPublication::Remote(remote) => {
307            TrackPublication::Remote(RemoteTrackPublication(remote))
308        }
309    }
310}
311
312fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
313    match track {
314        livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
315        livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
316    }
317}
318
319fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
320    match track {
321        livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
322        livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
323    }
324}
325fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
326    let event = match event {
327        livekit::RoomEvent::ParticipantConnected(remote_participant) => {
328            RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
329        }
330        livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
331            RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
332        }
333        livekit::RoomEvent::LocalTrackPublished {
334            publication,
335            track,
336            participant,
337        } => RoomEvent::LocalTrackPublished {
338            publication: LocalTrackPublication(publication),
339            track: local_track_from_livekit(track),
340            participant: LocalParticipant(participant),
341        },
342        livekit::RoomEvent::LocalTrackUnpublished {
343            publication,
344            participant,
345        } => RoomEvent::LocalTrackUnpublished {
346            publication: LocalTrackPublication(publication),
347            participant: LocalParticipant(participant),
348        },
349        livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
350            track: local_track_from_livekit(track),
351        },
352        livekit::RoomEvent::TrackSubscribed {
353            track,
354            publication,
355            participant,
356        } => RoomEvent::TrackSubscribed {
357            track: remote_track_from_livekit(track),
358            publication: RemoteTrackPublication(publication),
359            participant: RemoteParticipant(participant),
360        },
361        livekit::RoomEvent::TrackUnsubscribed {
362            track,
363            publication,
364            participant,
365        } => RoomEvent::TrackUnsubscribed {
366            track: remote_track_from_livekit(track),
367            publication: RemoteTrackPublication(publication),
368            participant: RemoteParticipant(participant),
369        },
370        livekit::RoomEvent::TrackSubscriptionFailed {
371            participant,
372            error: _,
373            track_sid,
374        } => RoomEvent::TrackSubscriptionFailed {
375            participant: RemoteParticipant(participant),
376            track_sid,
377        },
378        livekit::RoomEvent::TrackPublished {
379            publication,
380            participant,
381        } => RoomEvent::TrackPublished {
382            publication: RemoteTrackPublication(publication),
383            participant: RemoteParticipant(participant),
384        },
385        livekit::RoomEvent::TrackUnpublished {
386            publication,
387            participant,
388        } => RoomEvent::TrackUnpublished {
389            publication: RemoteTrackPublication(publication),
390            participant: RemoteParticipant(participant),
391        },
392        livekit::RoomEvent::TrackMuted {
393            participant,
394            publication,
395        } => RoomEvent::TrackMuted {
396            publication: publication_from_livekit(publication),
397            participant: participant_from_livekit(participant),
398        },
399        livekit::RoomEvent::TrackUnmuted {
400            participant,
401            publication,
402        } => RoomEvent::TrackUnmuted {
403            publication: publication_from_livekit(publication),
404            participant: participant_from_livekit(participant),
405        },
406        livekit::RoomEvent::RoomMetadataChanged {
407            old_metadata,
408            metadata,
409        } => RoomEvent::RoomMetadataChanged {
410            old_metadata,
411            metadata,
412        },
413        livekit::RoomEvent::ParticipantMetadataChanged {
414            participant,
415            old_metadata,
416            metadata,
417        } => RoomEvent::ParticipantMetadataChanged {
418            participant: participant_from_livekit(participant),
419            old_metadata,
420            metadata,
421        },
422        livekit::RoomEvent::ParticipantNameChanged {
423            participant,
424            old_name,
425            name,
426        } => RoomEvent::ParticipantNameChanged {
427            participant: participant_from_livekit(participant),
428            old_name,
429            name,
430        },
431        livekit::RoomEvent::ParticipantAttributesChanged {
432            participant,
433            changed_attributes,
434        } => RoomEvent::ParticipantAttributesChanged {
435            participant: participant_from_livekit(participant),
436            changed_attributes: changed_attributes.into_iter().collect(),
437        },
438        livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
439            RoomEvent::ActiveSpeakersChanged {
440                speakers: speakers.into_iter().map(participant_from_livekit).collect(),
441            }
442        }
443        livekit::RoomEvent::Connected {
444            participants_with_tracks,
445        } => RoomEvent::Connected {
446            participants_with_tracks: participants_with_tracks
447                .into_iter()
448                .map({
449                    |(p, t)| {
450                        (
451                            RemoteParticipant(p),
452                            t.into_iter().map(RemoteTrackPublication).collect(),
453                        )
454                    }
455                })
456                .collect(),
457        },
458        livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
459            reason: reason.as_str_name(),
460        },
461        livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
462        livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
463        _ => {
464            log::trace!("dropping livekit event: {:?}", event);
465            return None;
466        }
467    };
468
469    Some(event)
470}