livekit_client.rs

  1use std::sync::Arc;
  2
  3use anyhow::{Context as _, Result, anyhow};
  4use audio::AudioSettings;
  5use collections::HashMap;
  6use futures::{SinkExt, channel::mpsc};
  7use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
  8use gpui_tokio::Tokio;
  9use log::info;
 10use playback::capture_local_video_track;
 11use settings::Settings;
 12
 13mod playback;
 14
 15use crate::{
 16    LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication,
 17    livekit_client::playback::Speaker,
 18};
 19pub use playback::AudioStream;
 20pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
 21
 22#[derive(Clone, Debug)]
 23pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
 24#[derive(Clone, Debug)]
 25pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
 26#[derive(Clone, Debug)]
 27pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
 28#[derive(Clone, Debug)]
 29pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
 30
 31#[derive(Clone, Debug)]
 32pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
 33#[derive(Clone, Debug)]
 34pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
 35#[derive(Clone, Debug)]
 36pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
 37#[derive(Clone, Debug)]
 38pub struct LocalParticipant(livekit::participant::LocalParticipant);
 39
 40pub struct Room {
 41    room: livekit::Room,
 42    _task: Task<()>,
 43    playback: playback::AudioStack,
 44}
 45
 46pub type TrackSid = livekit::id::TrackSid;
 47pub type ConnectionState = livekit::ConnectionState;
 48#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
 49pub struct ParticipantIdentity(pub String);
 50
 51impl Room {
 52    pub async fn connect(
 53        url: String,
 54        token: String,
 55        cx: &mut AsyncApp,
 56    ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
 57        let connector =
 58            tokio_tungstenite::Connector::Rustls(Arc::new(http_client_tls::tls_config()));
 59        let mut config = livekit::RoomOptions::default();
 60        config.connector = Some(connector);
 61        let (room, mut events) = Tokio::spawn(cx, async move {
 62            livekit::Room::connect(&url, &token, config).await
 63        })?
 64        .await??;
 65
 66        let (mut tx, rx) = mpsc::unbounded();
 67        let task = cx.background_executor().spawn(async move {
 68            while let Some(event) = events.recv().await {
 69                if let Some(event) = room_event_from_livekit(event) {
 70                    tx.send(event).await.ok();
 71                }
 72            }
 73        });
 74
 75        Ok((
 76            Self {
 77                room,
 78                _task: task,
 79                playback: playback::AudioStack::new(cx.background_executor().clone()),
 80            },
 81            rx,
 82        ))
 83    }
 84
 85    pub fn local_participant(&self) -> LocalParticipant {
 86        LocalParticipant(self.room.local_participant())
 87    }
 88
 89    pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
 90        self.room
 91            .remote_participants()
 92            .into_iter()
 93            .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
 94            .collect()
 95    }
 96
 97    pub fn connection_state(&self) -> ConnectionState {
 98        self.room.connection_state()
 99    }
100
101    pub async fn publish_local_microphone_track(
102        &self,
103        user_name: String,
104        is_staff: bool,
105        cx: &mut AsyncApp,
106    ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
107        let (track, stream) = self
108            .playback
109            .capture_local_microphone_track(user_name, is_staff, &cx)?;
110        let publication = self
111            .local_participant()
112            .publish_track(
113                livekit::track::LocalTrack::Audio(track.0),
114                livekit::options::TrackPublishOptions {
115                    source: livekit::track::TrackSource::Microphone,
116                    ..Default::default()
117                },
118                cx,
119            )
120            .await?;
121
122        Ok((publication, stream))
123    }
124
125    pub async fn unpublish_local_track(
126        &self,
127        sid: TrackSid,
128        cx: &mut AsyncApp,
129    ) -> Result<LocalTrackPublication> {
130        self.local_participant().unpublish_track(sid, cx).await
131    }
132
133    pub fn play_remote_audio_track(
134        &self,
135        track: &RemoteAudioTrack,
136        cx: &mut App,
137    ) -> Result<playback::AudioStream> {
138        let speaker: Speaker =
139            serde_urlencoded::from_str(&track.0.name()).unwrap_or_else(|_| Speaker {
140                name: track.0.name(),
141                is_staff: false,
142                sends_legacy_audio: true,
143            });
144
145        if AudioSettings::get_global(cx).rodio_audio {
146            info!("Using experimental.rodio_audio audio pipeline for output");
147            playback::play_remote_audio_track(&track.0, speaker, cx)
148        } else if speaker.sends_legacy_audio {
149            Ok(self.playback.play_remote_audio_track(&track.0))
150        } else {
151            Err(anyhow!("Client version too old to play audio in call"))
152        }
153    }
154}
155
156impl LocalParticipant {
157    pub async fn publish_screenshare_track(
158        &self,
159        source: &dyn ScreenCaptureSource,
160        cx: &mut AsyncApp,
161    ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
162        let (track, stream) = capture_local_video_track(source, cx).await?;
163        let options = livekit::options::TrackPublishOptions {
164            source: livekit::track::TrackSource::Screenshare,
165            video_codec: livekit::options::VideoCodec::VP8,
166            ..Default::default()
167        };
168        let publication = self
169            .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
170            .await?;
171
172        Ok((publication, stream))
173    }
174
175    async fn publish_track(
176        &self,
177        track: livekit::track::LocalTrack,
178        options: livekit::options::TrackPublishOptions,
179        cx: &mut AsyncApp,
180    ) -> Result<LocalTrackPublication> {
181        let participant = self.0.clone();
182        Tokio::spawn(cx, async move {
183            participant.publish_track(track, options).await
184        })?
185        .await?
186        .map(LocalTrackPublication)
187        .context("publishing a track")
188    }
189
190    pub async fn unpublish_track(
191        &self,
192        sid: TrackSid,
193        cx: &mut AsyncApp,
194    ) -> Result<LocalTrackPublication> {
195        let participant = self.0.clone();
196        Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })?
197            .await?
198            .map(LocalTrackPublication)
199            .context("unpublishing a track")
200    }
201}
202
203impl LocalTrackPublication {
204    pub fn mute(&self, cx: &App) {
205        let track = self.0.clone();
206        Tokio::spawn(cx, async move {
207            track.mute();
208        })
209        .detach();
210    }
211
212    pub fn unmute(&self, cx: &App) {
213        let track = self.0.clone();
214        Tokio::spawn(cx, async move {
215            track.unmute();
216        })
217        .detach();
218    }
219
220    pub fn sid(&self) -> TrackSid {
221        self.0.sid()
222    }
223
224    pub fn is_muted(&self) -> bool {
225        self.0.is_muted()
226    }
227}
228
229impl RemoteParticipant {
230    pub fn identity(&self) -> ParticipantIdentity {
231        ParticipantIdentity(self.0.identity().0)
232    }
233
234    pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
235        self.0
236            .track_publications()
237            .into_iter()
238            .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
239            .collect()
240    }
241}
242
243impl RemoteAudioTrack {
244    pub fn sid(&self) -> TrackSid {
245        self.0.sid()
246    }
247}
248
249impl RemoteVideoTrack {
250    pub fn sid(&self) -> TrackSid {
251        self.0.sid()
252    }
253}
254
255impl RemoteTrackPublication {
256    pub fn is_muted(&self) -> bool {
257        self.0.is_muted()
258    }
259
260    pub fn is_enabled(&self) -> bool {
261        self.0.is_enabled()
262    }
263
264    pub fn track(&self) -> Option<RemoteTrack> {
265        self.0.track().map(remote_track_from_livekit)
266    }
267
268    pub fn is_audio(&self) -> bool {
269        self.0.kind() == livekit::track::TrackKind::Audio
270    }
271
272    pub fn set_enabled(&self, enabled: bool, cx: &App) {
273        let track = self.0.clone();
274        Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
275    }
276
277    pub fn sid(&self) -> TrackSid {
278        self.0.sid()
279    }
280}
281
282impl Participant {
283    pub fn identity(&self) -> ParticipantIdentity {
284        match self {
285            Participant::Local(local_participant) => {
286                ParticipantIdentity(local_participant.0.identity().0)
287            }
288            Participant::Remote(remote_participant) => {
289                ParticipantIdentity(remote_participant.0.identity().0)
290            }
291        }
292    }
293}
294
295fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
296    match participant {
297        livekit::participant::Participant::Local(local) => {
298            Participant::Local(LocalParticipant(local))
299        }
300        livekit::participant::Participant::Remote(remote) => {
301            Participant::Remote(RemoteParticipant(remote))
302        }
303    }
304}
305
306fn publication_from_livekit(
307    publication: livekit::publication::TrackPublication,
308) -> TrackPublication {
309    match publication {
310        livekit::publication::TrackPublication::Local(local) => {
311            TrackPublication::Local(LocalTrackPublication(local))
312        }
313        livekit::publication::TrackPublication::Remote(remote) => {
314            TrackPublication::Remote(RemoteTrackPublication(remote))
315        }
316    }
317}
318
319fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
320    match track {
321        livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
322        livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
323    }
324}
325
326fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
327    match track {
328        livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
329        livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
330    }
331}
332fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
333    let event = match event {
334        livekit::RoomEvent::ParticipantConnected(remote_participant) => {
335            RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
336        }
337        livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
338            RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
339        }
340        livekit::RoomEvent::LocalTrackPublished {
341            publication,
342            track,
343            participant,
344        } => RoomEvent::LocalTrackPublished {
345            publication: LocalTrackPublication(publication),
346            track: local_track_from_livekit(track),
347            participant: LocalParticipant(participant),
348        },
349        livekit::RoomEvent::LocalTrackUnpublished {
350            publication,
351            participant,
352        } => RoomEvent::LocalTrackUnpublished {
353            publication: LocalTrackPublication(publication),
354            participant: LocalParticipant(participant),
355        },
356        livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
357            track: local_track_from_livekit(track),
358        },
359        livekit::RoomEvent::TrackSubscribed {
360            track,
361            publication,
362            participant,
363        } => RoomEvent::TrackSubscribed {
364            track: remote_track_from_livekit(track),
365            publication: RemoteTrackPublication(publication),
366            participant: RemoteParticipant(participant),
367        },
368        livekit::RoomEvent::TrackUnsubscribed {
369            track,
370            publication,
371            participant,
372        } => RoomEvent::TrackUnsubscribed {
373            track: remote_track_from_livekit(track),
374            publication: RemoteTrackPublication(publication),
375            participant: RemoteParticipant(participant),
376        },
377        livekit::RoomEvent::TrackSubscriptionFailed {
378            participant,
379            error: _,
380            track_sid,
381        } => RoomEvent::TrackSubscriptionFailed {
382            participant: RemoteParticipant(participant),
383            track_sid,
384        },
385        livekit::RoomEvent::TrackPublished {
386            publication,
387            participant,
388        } => RoomEvent::TrackPublished {
389            publication: RemoteTrackPublication(publication),
390            participant: RemoteParticipant(participant),
391        },
392        livekit::RoomEvent::TrackUnpublished {
393            publication,
394            participant,
395        } => RoomEvent::TrackUnpublished {
396            publication: RemoteTrackPublication(publication),
397            participant: RemoteParticipant(participant),
398        },
399        livekit::RoomEvent::TrackMuted {
400            participant,
401            publication,
402        } => RoomEvent::TrackMuted {
403            publication: publication_from_livekit(publication),
404            participant: participant_from_livekit(participant),
405        },
406        livekit::RoomEvent::TrackUnmuted {
407            participant,
408            publication,
409        } => RoomEvent::TrackUnmuted {
410            publication: publication_from_livekit(publication),
411            participant: participant_from_livekit(participant),
412        },
413        livekit::RoomEvent::RoomMetadataChanged {
414            old_metadata,
415            metadata,
416        } => RoomEvent::RoomMetadataChanged {
417            old_metadata,
418            metadata,
419        },
420        livekit::RoomEvent::ParticipantMetadataChanged {
421            participant,
422            old_metadata,
423            metadata,
424        } => RoomEvent::ParticipantMetadataChanged {
425            participant: participant_from_livekit(participant),
426            old_metadata,
427            metadata,
428        },
429        livekit::RoomEvent::ParticipantNameChanged {
430            participant,
431            old_name,
432            name,
433        } => RoomEvent::ParticipantNameChanged {
434            participant: participant_from_livekit(participant),
435            old_name,
436            name,
437        },
438        livekit::RoomEvent::ParticipantAttributesChanged {
439            participant,
440            changed_attributes,
441        } => RoomEvent::ParticipantAttributesChanged {
442            participant: participant_from_livekit(participant),
443            changed_attributes: changed_attributes.into_iter().collect(),
444        },
445        livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
446            RoomEvent::ActiveSpeakersChanged {
447                speakers: speakers.into_iter().map(participant_from_livekit).collect(),
448            }
449        }
450        livekit::RoomEvent::Connected {
451            participants_with_tracks,
452        } => RoomEvent::Connected {
453            participants_with_tracks: participants_with_tracks
454                .into_iter()
455                .map({
456                    |(p, t)| {
457                        (
458                            RemoteParticipant(p),
459                            t.into_iter().map(RemoteTrackPublication).collect(),
460                        )
461                    }
462                })
463                .collect(),
464        },
465        livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
466            reason: reason.as_str_name(),
467        },
468        livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
469        livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
470        _ => {
471            log::trace!("dropping livekit event: {:?}", event);
472            return None;
473        }
474    };
475
476    Some(event)
477}