livekit_client.rs

  1use std::sync::Arc;
  2
  3use anyhow::Result;
  4use collections::HashMap;
  5use futures::{SinkExt, channel::mpsc};
  6use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
  7use gpui_tokio::Tokio;
  8use playback::capture_local_video_track;
  9
 10mod playback;
 11
 12use crate::{LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication};
 13pub use playback::AudioStream;
 14pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
 15
 16#[derive(Clone, Debug)]
 17pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
 18#[derive(Clone, Debug)]
 19pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
 20#[derive(Clone, Debug)]
 21pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
 22#[derive(Clone, Debug)]
 23pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
 24
 25#[derive(Clone, Debug)]
 26pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
 27#[derive(Clone, Debug)]
 28pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
 29#[derive(Clone, Debug)]
 30pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
 31#[derive(Clone, Debug)]
 32pub struct LocalParticipant(livekit::participant::LocalParticipant);
 33
 34pub struct Room {
 35    room: livekit::Room,
 36    _task: Task<()>,
 37    playback: playback::AudioStack,
 38}
 39
 40pub type TrackSid = livekit::id::TrackSid;
 41pub type ConnectionState = livekit::ConnectionState;
 42#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
 43pub struct ParticipantIdentity(pub String);
 44
 45impl Room {
 46    pub async fn connect(
 47        url: String,
 48        token: String,
 49        cx: &mut AsyncApp,
 50    ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
 51        let connector =
 52            tokio_tungstenite::Connector::Rustls(Arc::new(http_client_tls::tls_config()));
 53        let mut config = livekit::RoomOptions::default();
 54        config.connector = Some(connector);
 55        let (room, mut events) = Tokio::spawn(cx, async move {
 56            livekit::Room::connect(&url, &token, config).await
 57        })?
 58        .await??;
 59
 60        let (mut tx, rx) = mpsc::unbounded();
 61        let task = cx.background_executor().spawn(async move {
 62            while let Some(event) = events.recv().await {
 63                if let Some(event) = room_event_from_livekit(event) {
 64                    tx.send(event.into()).await.ok();
 65                }
 66            }
 67        });
 68
 69        Ok((
 70            Self {
 71                room,
 72                _task: task,
 73                playback: playback::AudioStack::new(cx.background_executor().clone()),
 74            },
 75            rx,
 76        ))
 77    }
 78
 79    pub fn local_participant(&self) -> LocalParticipant {
 80        LocalParticipant(self.room.local_participant())
 81    }
 82
 83    pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
 84        self.room
 85            .remote_participants()
 86            .into_iter()
 87            .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
 88            .collect()
 89    }
 90
 91    pub fn connection_state(&self) -> ConnectionState {
 92        self.room.connection_state()
 93    }
 94
 95    pub async fn publish_local_microphone_track(
 96        &self,
 97        cx: &mut AsyncApp,
 98    ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
 99        let (track, stream) = self.playback.capture_local_microphone_track()?;
100        let publication = self
101            .local_participant()
102            .publish_track(
103                livekit::track::LocalTrack::Audio(track.0),
104                livekit::options::TrackPublishOptions {
105                    source: livekit::track::TrackSource::Microphone,
106                    ..Default::default()
107                },
108                cx,
109            )
110            .await?;
111
112        Ok((publication, stream))
113    }
114
115    // pub async fn publish_local_wav_track(
116    //     &self,
117    //     cx: &mut AsyncApp,
118    // ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
119    //     let apm = self.apm.clone();
120    //     let executor = cx.background_executor().clone();
121    //     let (track, stream) =
122    //         Tokio::spawn(
123    //             cx,
124    //             async move { capture_local_wav_track(apm, &executor).await },
125    //         )?
126    //         .await??;
127    //     let publication = self
128    //         .local_participant()
129    //         .publish_track(
130    //             livekit::track::LocalTrack::Audio(track.0),
131    //             livekit::options::TrackPublishOptions {
132    //                 source: livekit::track::TrackSource::Microphone,
133    //                 ..Default::default()
134    //             },
135    //             cx,
136    //         )
137    //         .await?;
138
139    //     Ok((publication, stream))
140    // }
141
142    pub async fn unpublish_local_track(
143        &self,
144        sid: TrackSid,
145        cx: &mut AsyncApp,
146    ) -> Result<LocalTrackPublication> {
147        self.local_participant().unpublish_track(sid, cx).await
148    }
149
150    pub fn play_remote_audio_track(
151        &self,
152        track: &RemoteAudioTrack,
153        _cx: &App,
154    ) -> Result<playback::AudioStream> {
155        Ok(self.playback.play_remote_audio_track(&track.0))
156    }
157}
158
159impl LocalParticipant {
160    pub async fn publish_screenshare_track(
161        &self,
162        source: &dyn ScreenCaptureSource,
163        cx: &mut AsyncApp,
164    ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
165        let (track, stream) = capture_local_video_track(&*source, cx).await?;
166        let options = livekit::options::TrackPublishOptions {
167            source: livekit::track::TrackSource::Screenshare,
168            video_codec: livekit::options::VideoCodec::VP8,
169            ..Default::default()
170        };
171        let publication = self
172            .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
173            .await?;
174
175        Ok((publication, stream))
176    }
177
178    async fn publish_track(
179        &self,
180        track: livekit::track::LocalTrack,
181        options: livekit::options::TrackPublishOptions,
182        cx: &mut AsyncApp,
183    ) -> Result<LocalTrackPublication> {
184        let participant = self.0.clone();
185        Tokio::spawn(cx, async move {
186            participant.publish_track(track, options).await
187        })?
188        .await?
189        .map(|p| LocalTrackPublication(p))
190        .map_err(|error| anyhow::anyhow!("failed to publish track: {error}"))
191    }
192
193    pub async fn unpublish_track(
194        &self,
195        sid: TrackSid,
196        cx: &mut AsyncApp,
197    ) -> Result<LocalTrackPublication> {
198        let participant = self.0.clone();
199        Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })?
200            .await?
201            .map(|p| LocalTrackPublication(p))
202            .map_err(|error| anyhow::anyhow!("failed to unpublish track: {error}"))
203    }
204}
205
206impl LocalTrackPublication {
207    pub fn mute(&self, cx: &App) {
208        let track = self.0.clone();
209        Tokio::spawn(cx, async move {
210            track.mute();
211        })
212        .detach();
213    }
214
215    pub fn unmute(&self, cx: &App) {
216        let track = self.0.clone();
217        Tokio::spawn(cx, async move {
218            track.unmute();
219        })
220        .detach();
221    }
222
223    pub fn sid(&self) -> TrackSid {
224        self.0.sid()
225    }
226
227    pub fn is_muted(&self) -> bool {
228        self.0.is_muted()
229    }
230}
231
232impl RemoteParticipant {
233    pub fn identity(&self) -> ParticipantIdentity {
234        ParticipantIdentity(self.0.identity().0)
235    }
236
237    pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
238        self.0
239            .track_publications()
240            .into_iter()
241            .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
242            .collect()
243    }
244}
245
246impl RemoteAudioTrack {
247    pub fn sid(&self) -> TrackSid {
248        self.0.sid()
249    }
250}
251
252impl RemoteVideoTrack {
253    pub fn sid(&self) -> TrackSid {
254        self.0.sid()
255    }
256}
257
258impl RemoteTrackPublication {
259    pub fn is_muted(&self) -> bool {
260        self.0.is_muted()
261    }
262
263    pub fn is_enabled(&self) -> bool {
264        self.0.is_enabled()
265    }
266
267    pub fn track(&self) -> Option<RemoteTrack> {
268        self.0.track().map(remote_track_from_livekit)
269    }
270
271    pub fn is_audio(&self) -> bool {
272        self.0.kind() == livekit::track::TrackKind::Audio
273    }
274
275    pub fn set_enabled(&self, enabled: bool, cx: &App) {
276        let track = self.0.clone();
277        Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
278    }
279
280    pub fn sid(&self) -> TrackSid {
281        self.0.sid()
282    }
283}
284
285impl RemoteTrack {
286    pub fn set_enabled(&self, enabled: bool, cx: &App) {
287        let this = self.clone();
288        Tokio::spawn(cx, async move {
289            match this {
290                RemoteTrack::Audio(remote_audio_track) => {
291                    remote_audio_track.0.rtc_track().set_enabled(enabled)
292                }
293                RemoteTrack::Video(remote_video_track) => {
294                    remote_video_track.0.rtc_track().set_enabled(enabled)
295                }
296            }
297        })
298        .detach();
299    }
300}
301
302impl Participant {
303    pub fn identity(&self) -> ParticipantIdentity {
304        match self {
305            Participant::Local(local_participant) => {
306                ParticipantIdentity(local_participant.0.identity().0)
307            }
308            Participant::Remote(remote_participant) => {
309                ParticipantIdentity(remote_participant.0.identity().0)
310            }
311        }
312    }
313}
314
315fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
316    match participant {
317        livekit::participant::Participant::Local(local) => {
318            Participant::Local(LocalParticipant(local))
319        }
320        livekit::participant::Participant::Remote(remote) => {
321            Participant::Remote(RemoteParticipant(remote))
322        }
323    }
324}
325
326fn publication_from_livekit(
327    publication: livekit::publication::TrackPublication,
328) -> TrackPublication {
329    match publication {
330        livekit::publication::TrackPublication::Local(local) => {
331            TrackPublication::Local(LocalTrackPublication(local))
332        }
333        livekit::publication::TrackPublication::Remote(remote) => {
334            TrackPublication::Remote(RemoteTrackPublication(remote))
335        }
336    }
337}
338
339fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
340    match track {
341        livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
342        livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
343    }
344}
345
346fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
347    match track {
348        livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
349        livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
350    }
351}
352fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
353    let event = match event {
354        livekit::RoomEvent::ParticipantConnected(remote_participant) => {
355            RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
356        }
357        livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
358            RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
359        }
360        livekit::RoomEvent::LocalTrackPublished {
361            publication,
362            track,
363            participant,
364        } => RoomEvent::LocalTrackPublished {
365            publication: LocalTrackPublication(publication),
366            track: local_track_from_livekit(track),
367            participant: LocalParticipant(participant),
368        },
369        livekit::RoomEvent::LocalTrackUnpublished {
370            publication,
371            participant,
372        } => RoomEvent::LocalTrackUnpublished {
373            publication: LocalTrackPublication(publication),
374            participant: LocalParticipant(participant),
375        },
376        livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
377            track: local_track_from_livekit(track),
378        },
379        livekit::RoomEvent::TrackSubscribed {
380            track,
381            publication,
382            participant,
383        } => RoomEvent::TrackSubscribed {
384            track: remote_track_from_livekit(track),
385            publication: RemoteTrackPublication(publication),
386            participant: RemoteParticipant(participant),
387        },
388        livekit::RoomEvent::TrackUnsubscribed {
389            track,
390            publication,
391            participant,
392        } => RoomEvent::TrackUnsubscribed {
393            track: remote_track_from_livekit(track),
394            publication: RemoteTrackPublication(publication),
395            participant: RemoteParticipant(participant),
396        },
397        livekit::RoomEvent::TrackSubscriptionFailed {
398            participant,
399            error: _,
400            track_sid,
401        } => RoomEvent::TrackSubscriptionFailed {
402            participant: RemoteParticipant(participant),
403            track_sid,
404        },
405        livekit::RoomEvent::TrackPublished {
406            publication,
407            participant,
408        } => RoomEvent::TrackPublished {
409            publication: RemoteTrackPublication(publication),
410            participant: RemoteParticipant(participant),
411        },
412        livekit::RoomEvent::TrackUnpublished {
413            publication,
414            participant,
415        } => RoomEvent::TrackUnpublished {
416            publication: RemoteTrackPublication(publication),
417            participant: RemoteParticipant(participant),
418        },
419        livekit::RoomEvent::TrackMuted {
420            participant,
421            publication,
422        } => RoomEvent::TrackMuted {
423            publication: publication_from_livekit(publication),
424            participant: participant_from_livekit(participant),
425        },
426        livekit::RoomEvent::TrackUnmuted {
427            participant,
428            publication,
429        } => RoomEvent::TrackUnmuted {
430            publication: publication_from_livekit(publication),
431            participant: participant_from_livekit(participant),
432        },
433        livekit::RoomEvent::RoomMetadataChanged {
434            old_metadata,
435            metadata,
436        } => RoomEvent::RoomMetadataChanged {
437            old_metadata,
438            metadata,
439        },
440        livekit::RoomEvent::ParticipantMetadataChanged {
441            participant,
442            old_metadata,
443            metadata,
444        } => RoomEvent::ParticipantMetadataChanged {
445            participant: participant_from_livekit(participant),
446            old_metadata,
447            metadata,
448        },
449        livekit::RoomEvent::ParticipantNameChanged {
450            participant,
451            old_name,
452            name,
453        } => RoomEvent::ParticipantNameChanged {
454            participant: participant_from_livekit(participant),
455            old_name,
456            name,
457        },
458        livekit::RoomEvent::ParticipantAttributesChanged {
459            participant,
460            changed_attributes,
461        } => RoomEvent::ParticipantAttributesChanged {
462            participant: participant_from_livekit(participant),
463            changed_attributes: changed_attributes.into_iter().collect(),
464        },
465        livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
466            RoomEvent::ActiveSpeakersChanged {
467                speakers: speakers.into_iter().map(participant_from_livekit).collect(),
468            }
469        }
470        livekit::RoomEvent::Connected {
471            participants_with_tracks,
472        } => RoomEvent::Connected {
473            participants_with_tracks: participants_with_tracks
474                .into_iter()
475                .map({
476                    |(p, t)| {
477                        (
478                            RemoteParticipant(p),
479                            t.into_iter().map(|t| RemoteTrackPublication(t)).collect(),
480                        )
481                    }
482                })
483                .collect(),
484        },
485        livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
486            reason: reason.as_str_name(),
487        },
488        livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
489        livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
490        _ => {
491            log::trace!("dropping livekit event: {:?}", event);
492            return None;
493        }
494    };
495
496    Some(event)
497}