1use anyhow::{Context as _, Result, anyhow};
2use audio::AudioSettings;
3use collections::HashMap;
4use futures::{SinkExt, channel::mpsc};
5use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
6use gpui_tokio::Tokio;
7use log::info;
8use playback::capture_local_video_track;
9use settings::Settings;
10use std::sync::{Arc, atomic::AtomicU64};
11
12#[cfg(target_os = "linux")]
13mod linux;
14mod playback;
15
16use crate::{
17 ConnectionQuality, LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication,
18 livekit_client::playback::Speaker,
19};
20pub use livekit::SessionStats;
21pub use livekit::webrtc::stats::RtcStats;
22pub use playback::AudioStream;
23pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
24
25#[derive(Clone, Debug)]
26pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
27#[derive(Clone, Debug)]
28pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
29#[derive(Clone, Debug)]
30pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
31#[derive(Clone, Debug)]
32pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
33
34#[derive(Clone, Debug)]
35pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
36#[derive(Clone, Debug)]
37pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
38#[derive(Clone, Debug)]
39pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
40#[derive(Clone, Debug)]
41pub struct LocalParticipant(livekit::participant::LocalParticipant);
42
43pub struct Room {
44 room: livekit::Room,
45 _task: Task<()>,
46 playback: playback::AudioStack,
47}
48
49pub type TrackSid = livekit::id::TrackSid;
50pub type ConnectionState = livekit::ConnectionState;
51#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
52pub struct ParticipantIdentity(pub String);
53
54impl Room {
55 pub async fn connect(
56 url: String,
57 token: String,
58 cx: &mut AsyncApp,
59 ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
60 let mut config = livekit::RoomOptions::default();
61 config.tls_config = livekit::TlsConfig(Some(http_client_tls::tls_config()));
62 let (room, mut events) = Tokio::spawn(cx, async move {
63 livekit::Room::connect(&url, &token, config).await
64 })
65 .await??;
66
67 let (mut tx, rx) = mpsc::unbounded();
68 let task = cx.background_executor().spawn(async move {
69 while let Some(event) = events.recv().await {
70 if let Some(event) = room_event_from_livekit(event) {
71 tx.send(event).await.ok();
72 }
73 }
74 });
75
76 Ok((
77 Self {
78 room,
79 _task: task,
80 playback: playback::AudioStack::new(cx.background_executor().clone()),
81 },
82 rx,
83 ))
84 }
85
86 pub fn local_participant(&self) -> LocalParticipant {
87 LocalParticipant(self.room.local_participant())
88 }
89
90 pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
91 self.room
92 .remote_participants()
93 .into_iter()
94 .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
95 .collect()
96 }
97
98 pub fn connection_state(&self) -> ConnectionState {
99 self.room.connection_state()
100 }
101
102 pub fn name(&self) -> String {
103 self.room.name()
104 }
105
106 pub async fn sid(&self) -> String {
107 self.room.sid().await.to_string()
108 }
109
110 pub async fn publish_local_microphone_track(
111 &self,
112 user_name: String,
113 is_staff: bool,
114 cx: &mut AsyncApp,
115 ) -> Result<(LocalTrackPublication, playback::AudioStream, Arc<AtomicU64>)> {
116 let (track, stream, input_lag_us) = self
117 .playback
118 .capture_local_microphone_track(user_name, is_staff, &cx)?;
119 let publication = self
120 .local_participant()
121 .publish_track(
122 livekit::track::LocalTrack::Audio(track.0),
123 livekit::options::TrackPublishOptions {
124 source: livekit::track::TrackSource::Microphone,
125 ..Default::default()
126 },
127 cx,
128 )
129 .await?;
130
131 Ok((publication, stream, input_lag_us))
132 }
133
134 pub async fn unpublish_local_track(
135 &self,
136 sid: TrackSid,
137 cx: &mut AsyncApp,
138 ) -> Result<LocalTrackPublication> {
139 self.local_participant().unpublish_track(sid, cx).await
140 }
141
142 pub fn play_remote_audio_track(
143 &self,
144 track: &RemoteAudioTrack,
145 cx: &mut App,
146 ) -> Result<playback::AudioStream> {
147 let speaker: Speaker =
148 serde_urlencoded::from_str(&track.0.name()).unwrap_or_else(|_| Speaker {
149 name: track.0.name(),
150 is_staff: false,
151 sends_legacy_audio: true,
152 });
153
154 if AudioSettings::get_global(cx).rodio_audio {
155 info!("Using experimental.rodio_audio audio pipeline for output");
156 playback::play_remote_audio_track(&track.0, speaker, cx)
157 } else if speaker.sends_legacy_audio {
158 let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
159 Ok(self
160 .playback
161 .play_remote_audio_track(&track.0, output_audio_device))
162 } else {
163 Err(anyhow!("Client version too old to play audio in call"))
164 }
165 }
166
167 pub async fn get_stats(&self) -> Result<livekit::SessionStats> {
168 self.room.get_stats().await.map_err(anyhow::Error::from)
169 }
170
171 /// Returns a `Task` that fetches room stats on the Tokio runtime.
172 ///
173 /// LiveKit's SDK is Tokio-based, so the stats fetch must run within
174 /// a Tokio context rather than on GPUI's smol-based background executor.
175 pub fn stats_task(&self, cx: &impl gpui::AppContext) -> Task<Result<livekit::SessionStats>> {
176 let inner = self.room.clone();
177 Tokio::spawn_result(cx, async move {
178 inner.get_stats().await.map_err(anyhow::Error::from)
179 })
180 }
181}
182
183impl LocalParticipant {
184 pub fn connection_quality(&self) -> ConnectionQuality {
185 connection_quality_from_livekit(self.0.connection_quality())
186 }
187
188 pub fn audio_level(&self) -> f32 {
189 self.0.audio_level()
190 }
191
192 pub async fn publish_screenshare_track(
193 &self,
194 source: &dyn ScreenCaptureSource,
195 cx: &mut AsyncApp,
196 ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
197 let (track, stream) = capture_local_video_track(source, cx).await?;
198 let options = livekit::options::TrackPublishOptions {
199 source: livekit::track::TrackSource::Screenshare,
200 video_codec: livekit::options::VideoCodec::VP8,
201 ..Default::default()
202 };
203 let publication = self
204 .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
205 .await?;
206
207 Ok((publication, stream))
208 }
209
210 async fn publish_track(
211 &self,
212 track: livekit::track::LocalTrack,
213 options: livekit::options::TrackPublishOptions,
214 cx: &mut AsyncApp,
215 ) -> Result<LocalTrackPublication> {
216 let participant = self.0.clone();
217 Tokio::spawn(cx, async move {
218 participant.publish_track(track, options).await
219 })
220 .await?
221 .map(LocalTrackPublication)
222 .context("publishing a track")
223 }
224
225 pub async fn unpublish_track(
226 &self,
227 sid: TrackSid,
228 cx: &mut AsyncApp,
229 ) -> Result<LocalTrackPublication> {
230 let participant = self.0.clone();
231 Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })
232 .await?
233 .map(LocalTrackPublication)
234 .context("unpublishing a track")
235 }
236
237 #[cfg(target_os = "linux")]
238 pub async fn publish_screenshare_track_wayland(
239 &self,
240 cx: &mut AsyncApp,
241 ) -> Result<(
242 LocalTrackPublication,
243 Box<dyn ScreenCaptureStream>,
244 futures::channel::oneshot::Receiver<()>,
245 )> {
246 let (track, stop_flag, feed_task, failure_rx) =
247 linux::start_wayland_desktop_capture(cx).await?;
248 let options = livekit::options::TrackPublishOptions {
249 source: livekit::track::TrackSource::Screenshare,
250 video_codec: livekit::options::VideoCodec::VP8,
251 ..Default::default()
252 };
253 let publication = self
254 .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
255 .await?;
256
257 Ok((
258 publication,
259 Box::new(linux::WaylandScreenCaptureStream::new(stop_flag, feed_task)),
260 failure_rx,
261 ))
262 }
263}
264
265impl LocalTrackPublication {
266 pub fn mute(&self, cx: &App) {
267 let track = self.0.clone();
268 Tokio::spawn(cx, async move {
269 track.mute();
270 })
271 .detach();
272 }
273
274 pub fn unmute(&self, cx: &App) {
275 let track = self.0.clone();
276 Tokio::spawn(cx, async move {
277 track.unmute();
278 })
279 .detach();
280 }
281
282 pub fn sid(&self) -> TrackSid {
283 self.0.sid()
284 }
285
286 pub fn is_muted(&self) -> bool {
287 self.0.is_muted()
288 }
289}
290
291impl RemoteParticipant {
292 pub fn connection_quality(&self) -> ConnectionQuality {
293 connection_quality_from_livekit(self.0.connection_quality())
294 }
295
296 pub fn audio_level(&self) -> f32 {
297 self.0.audio_level()
298 }
299
300 pub fn identity(&self) -> ParticipantIdentity {
301 ParticipantIdentity(self.0.identity().0)
302 }
303
304 pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
305 self.0
306 .track_publications()
307 .into_iter()
308 .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
309 .collect()
310 }
311}
312
313impl RemoteAudioTrack {
314 pub fn sid(&self) -> TrackSid {
315 self.0.sid()
316 }
317}
318
319impl RemoteVideoTrack {
320 pub fn sid(&self) -> TrackSid {
321 self.0.sid()
322 }
323}
324
325impl RemoteTrackPublication {
326 pub fn is_muted(&self) -> bool {
327 self.0.is_muted()
328 }
329
330 pub fn is_enabled(&self) -> bool {
331 self.0.is_enabled()
332 }
333
334 pub fn track(&self) -> Option<RemoteTrack> {
335 self.0.track().map(remote_track_from_livekit)
336 }
337
338 pub fn is_audio(&self) -> bool {
339 self.0.kind() == livekit::track::TrackKind::Audio
340 }
341
342 pub fn set_enabled(&self, enabled: bool, cx: &App) {
343 let track = self.0.clone();
344 Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
345 }
346
347 pub fn sid(&self) -> TrackSid {
348 self.0.sid()
349 }
350}
351
352impl Participant {
353 pub fn identity(&self) -> ParticipantIdentity {
354 match self {
355 Participant::Local(local_participant) => {
356 ParticipantIdentity(local_participant.0.identity().0)
357 }
358 Participant::Remote(remote_participant) => {
359 ParticipantIdentity(remote_participant.0.identity().0)
360 }
361 }
362 }
363
364 pub fn connection_quality(&self) -> ConnectionQuality {
365 match self {
366 Participant::Local(local_participant) => local_participant.connection_quality(),
367 Participant::Remote(remote_participant) => remote_participant.connection_quality(),
368 }
369 }
370
371 pub fn audio_level(&self) -> f32 {
372 match self {
373 Participant::Local(local_participant) => local_participant.audio_level(),
374 Participant::Remote(remote_participant) => remote_participant.audio_level(),
375 }
376 }
377}
378
379fn connection_quality_from_livekit(
380 quality: livekit::prelude::ConnectionQuality,
381) -> ConnectionQuality {
382 match quality {
383 livekit::prelude::ConnectionQuality::Excellent => ConnectionQuality::Excellent,
384 livekit::prelude::ConnectionQuality::Good => ConnectionQuality::Good,
385 livekit::prelude::ConnectionQuality::Poor => ConnectionQuality::Poor,
386 livekit::prelude::ConnectionQuality::Lost => ConnectionQuality::Lost,
387 }
388}
389
390fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
391 match participant {
392 livekit::participant::Participant::Local(local) => {
393 Participant::Local(LocalParticipant(local))
394 }
395 livekit::participant::Participant::Remote(remote) => {
396 Participant::Remote(RemoteParticipant(remote))
397 }
398 }
399}
400
401fn publication_from_livekit(
402 publication: livekit::publication::TrackPublication,
403) -> TrackPublication {
404 match publication {
405 livekit::publication::TrackPublication::Local(local) => {
406 TrackPublication::Local(LocalTrackPublication(local))
407 }
408 livekit::publication::TrackPublication::Remote(remote) => {
409 TrackPublication::Remote(RemoteTrackPublication(remote))
410 }
411 }
412}
413
414fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
415 match track {
416 livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
417 livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
418 }
419}
420
421fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
422 match track {
423 livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
424 livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
425 }
426}
427fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
428 let event = match event {
429 livekit::RoomEvent::ParticipantConnected(remote_participant) => {
430 RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
431 }
432 livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
433 RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
434 }
435 livekit::RoomEvent::LocalTrackPublished {
436 publication,
437 track,
438 participant,
439 } => RoomEvent::LocalTrackPublished {
440 publication: LocalTrackPublication(publication),
441 track: local_track_from_livekit(track),
442 participant: LocalParticipant(participant),
443 },
444 livekit::RoomEvent::LocalTrackUnpublished {
445 publication,
446 participant,
447 } => RoomEvent::LocalTrackUnpublished {
448 publication: LocalTrackPublication(publication),
449 participant: LocalParticipant(participant),
450 },
451 livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
452 track: local_track_from_livekit(track),
453 },
454 livekit::RoomEvent::TrackSubscribed {
455 track,
456 publication,
457 participant,
458 } => RoomEvent::TrackSubscribed {
459 track: remote_track_from_livekit(track),
460 publication: RemoteTrackPublication(publication),
461 participant: RemoteParticipant(participant),
462 },
463 livekit::RoomEvent::TrackUnsubscribed {
464 track,
465 publication,
466 participant,
467 } => RoomEvent::TrackUnsubscribed {
468 track: remote_track_from_livekit(track),
469 publication: RemoteTrackPublication(publication),
470 participant: RemoteParticipant(participant),
471 },
472 livekit::RoomEvent::TrackSubscriptionFailed {
473 participant,
474 error: _,
475 track_sid,
476 } => RoomEvent::TrackSubscriptionFailed {
477 participant: RemoteParticipant(participant),
478 track_sid,
479 },
480 livekit::RoomEvent::TrackPublished {
481 publication,
482 participant,
483 } => RoomEvent::TrackPublished {
484 publication: RemoteTrackPublication(publication),
485 participant: RemoteParticipant(participant),
486 },
487 livekit::RoomEvent::TrackUnpublished {
488 publication,
489 participant,
490 } => RoomEvent::TrackUnpublished {
491 publication: RemoteTrackPublication(publication),
492 participant: RemoteParticipant(participant),
493 },
494 livekit::RoomEvent::TrackMuted {
495 participant,
496 publication,
497 } => RoomEvent::TrackMuted {
498 publication: publication_from_livekit(publication),
499 participant: participant_from_livekit(participant),
500 },
501 livekit::RoomEvent::TrackUnmuted {
502 participant,
503 publication,
504 } => RoomEvent::TrackUnmuted {
505 publication: publication_from_livekit(publication),
506 participant: participant_from_livekit(participant),
507 },
508 livekit::RoomEvent::RoomMetadataChanged {
509 old_metadata,
510 metadata,
511 } => RoomEvent::RoomMetadataChanged {
512 old_metadata,
513 metadata,
514 },
515 livekit::RoomEvent::ParticipantMetadataChanged {
516 participant,
517 old_metadata,
518 metadata,
519 } => RoomEvent::ParticipantMetadataChanged {
520 participant: participant_from_livekit(participant),
521 old_metadata,
522 metadata,
523 },
524 livekit::RoomEvent::ParticipantNameChanged {
525 participant,
526 old_name,
527 name,
528 } => RoomEvent::ParticipantNameChanged {
529 participant: participant_from_livekit(participant),
530 old_name,
531 name,
532 },
533 livekit::RoomEvent::ParticipantAttributesChanged {
534 participant,
535 changed_attributes,
536 } => RoomEvent::ParticipantAttributesChanged {
537 participant: participant_from_livekit(participant),
538 changed_attributes: changed_attributes.into_iter().collect(),
539 },
540 livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
541 RoomEvent::ActiveSpeakersChanged {
542 speakers: speakers.into_iter().map(participant_from_livekit).collect(),
543 }
544 }
545 livekit::RoomEvent::Connected {
546 participants_with_tracks,
547 } => RoomEvent::Connected {
548 participants_with_tracks: participants_with_tracks
549 .into_iter()
550 .map({
551 |(p, t)| {
552 (
553 RemoteParticipant(p),
554 t.into_iter().map(RemoteTrackPublication).collect(),
555 )
556 }
557 })
558 .collect(),
559 },
560 livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
561 reason: reason.as_str_name(),
562 },
563 livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
564 livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
565 livekit::RoomEvent::ConnectionQualityChanged {
566 quality,
567 participant,
568 } => RoomEvent::ConnectionQualityChanged {
569 participant: participant_from_livekit(participant),
570 quality: connection_quality_from_livekit(quality),
571 },
572 _ => {
573 log::trace!("dropping livekit event: {:?}", event);
574 return None;
575 }
576 };
577
578 Some(event)
579}