1use anyhow::{Context as _, Result};
2use audio::AudioSettings;
3use collections::HashMap;
4use futures::{SinkExt, channel::mpsc};
5use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
6use gpui_tokio::Tokio;
7
8use playback::capture_local_video_track;
9use settings::Settings;
10use std::sync::{Arc, atomic::AtomicU64};
11
12#[cfg(target_os = "linux")]
13mod linux;
14mod playback;
15
16use crate::{ConnectionQuality, LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication};
17pub use livekit::SessionStats;
18pub use livekit::webrtc::stats::RtcStats;
19pub use playback::AudioStream;
20pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
21
22#[derive(Clone, Debug)]
23pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
24#[derive(Clone, Debug)]
25pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
26#[derive(Clone, Debug)]
27pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
28#[derive(Clone, Debug)]
29pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
30
31#[derive(Clone, Debug)]
32pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
33#[derive(Clone, Debug)]
34pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
35#[derive(Clone, Debug)]
36pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
37#[derive(Clone, Debug)]
38pub struct LocalParticipant(livekit::participant::LocalParticipant);
39
40pub struct Room {
41 room: livekit::Room,
42 _task: Task<()>,
43 playback: playback::AudioStack,
44}
45
46pub type TrackSid = livekit::id::TrackSid;
47pub type ConnectionState = livekit::ConnectionState;
48#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
49pub struct ParticipantIdentity(pub String);
50
51impl Room {
52 pub async fn connect(
53 url: String,
54 token: String,
55 cx: &mut AsyncApp,
56 ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
57 let mut config = livekit::RoomOptions::default();
58 config.tls_config = livekit::TlsConfig(Some(http_client_tls::tls_config()));
59 let (room, mut events) = Tokio::spawn(cx, async move {
60 livekit::Room::connect(&url, &token, config).await
61 })
62 .await??;
63
64 let (mut tx, rx) = mpsc::unbounded();
65 let task = cx.background_executor().spawn(async move {
66 while let Some(event) = events.recv().await {
67 if let Some(event) = room_event_from_livekit(event) {
68 tx.send(event).await.ok();
69 }
70 }
71 });
72
73 Ok((
74 Self {
75 room,
76 _task: task,
77 playback: playback::AudioStack::new(cx.background_executor().clone()),
78 },
79 rx,
80 ))
81 }
82
83 pub fn local_participant(&self) -> LocalParticipant {
84 LocalParticipant(self.room.local_participant())
85 }
86
87 pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
88 self.room
89 .remote_participants()
90 .into_iter()
91 .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
92 .collect()
93 }
94
95 pub fn connection_state(&self) -> ConnectionState {
96 self.room.connection_state()
97 }
98
99 pub fn name(&self) -> String {
100 self.room.name()
101 }
102
103 pub async fn sid(&self) -> String {
104 self.room.sid().await.to_string()
105 }
106
107 pub async fn publish_local_microphone_track(
108 &self,
109 user_name: String,
110 is_staff: bool,
111 cx: &mut AsyncApp,
112 ) -> Result<(LocalTrackPublication, playback::AudioStream, Arc<AtomicU64>)> {
113 let (track, stream, input_lag_us) = self
114 .playback
115 .capture_local_microphone_track(user_name, is_staff, &cx)?;
116 let publication = self
117 .local_participant()
118 .publish_track(
119 livekit::track::LocalTrack::Audio(track.0),
120 livekit::options::TrackPublishOptions {
121 source: livekit::track::TrackSource::Microphone,
122 ..Default::default()
123 },
124 cx,
125 )
126 .await?;
127
128 Ok((publication, stream, input_lag_us))
129 }
130
131 pub async fn unpublish_local_track(
132 &self,
133 sid: TrackSid,
134 cx: &mut AsyncApp,
135 ) -> Result<LocalTrackPublication> {
136 self.local_participant().unpublish_track(sid, cx).await
137 }
138
139 pub fn play_remote_audio_track(
140 &self,
141 track: &RemoteAudioTrack,
142 cx: &mut App,
143 ) -> Result<playback::AudioStream> {
144 let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
145 Ok(self
146 .playback
147 .play_remote_audio_track(&track.0, output_audio_device))
148 }
149
150 pub async fn get_stats(&self) -> Result<livekit::SessionStats> {
151 self.room.get_stats().await.map_err(anyhow::Error::from)
152 }
153
154 /// Returns a `Task` that fetches room stats on the Tokio runtime.
155 ///
156 /// LiveKit's SDK is Tokio-based, so the stats fetch must run within
157 /// a Tokio context rather than on GPUI's smol-based background executor.
158 pub fn stats_task(&self, cx: &impl gpui::AppContext) -> Task<Result<livekit::SessionStats>> {
159 let inner = self.room.clone();
160 Tokio::spawn_result(cx, async move {
161 inner.get_stats().await.map_err(anyhow::Error::from)
162 })
163 }
164}
165
166impl LocalParticipant {
167 pub fn connection_quality(&self) -> ConnectionQuality {
168 connection_quality_from_livekit(self.0.connection_quality())
169 }
170
171 pub fn audio_level(&self) -> f32 {
172 self.0.audio_level()
173 }
174
175 pub async fn publish_screenshare_track(
176 &self,
177 source: &dyn ScreenCaptureSource,
178 cx: &mut AsyncApp,
179 ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
180 let (track, stream) = capture_local_video_track(source, cx).await?;
181 let options = livekit::options::TrackPublishOptions {
182 source: livekit::track::TrackSource::Screenshare,
183 video_codec: livekit::options::VideoCodec::VP8,
184 ..Default::default()
185 };
186 let publication = self
187 .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
188 .await?;
189
190 Ok((publication, stream))
191 }
192
193 async fn publish_track(
194 &self,
195 track: livekit::track::LocalTrack,
196 options: livekit::options::TrackPublishOptions,
197 cx: &mut AsyncApp,
198 ) -> Result<LocalTrackPublication> {
199 let participant = self.0.clone();
200 Tokio::spawn(cx, async move {
201 participant.publish_track(track, options).await
202 })
203 .await?
204 .map(LocalTrackPublication)
205 .context("publishing a track")
206 }
207
208 pub async fn unpublish_track(
209 &self,
210 sid: TrackSid,
211 cx: &mut AsyncApp,
212 ) -> Result<LocalTrackPublication> {
213 let participant = self.0.clone();
214 Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })
215 .await?
216 .map(LocalTrackPublication)
217 .context("unpublishing a track")
218 }
219
220 #[cfg(target_os = "linux")]
221 pub async fn publish_screenshare_track_wayland(
222 &self,
223 cx: &mut AsyncApp,
224 ) -> Result<(
225 LocalTrackPublication,
226 Box<dyn ScreenCaptureStream>,
227 futures::channel::oneshot::Receiver<()>,
228 )> {
229 let (track, stop_flag, feed_task, failure_rx) =
230 linux::start_wayland_desktop_capture(cx).await?;
231 let options = livekit::options::TrackPublishOptions {
232 source: livekit::track::TrackSource::Screenshare,
233 video_codec: livekit::options::VideoCodec::VP8,
234 ..Default::default()
235 };
236 let publication = self
237 .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
238 .await?;
239
240 Ok((
241 publication,
242 Box::new(linux::WaylandScreenCaptureStream::new(stop_flag, feed_task)),
243 failure_rx,
244 ))
245 }
246}
247
248impl LocalTrackPublication {
249 pub fn mute(&self, cx: &App) {
250 let track = self.0.clone();
251 Tokio::spawn(cx, async move {
252 track.mute();
253 })
254 .detach();
255 }
256
257 pub fn unmute(&self, cx: &App) {
258 let track = self.0.clone();
259 Tokio::spawn(cx, async move {
260 track.unmute();
261 })
262 .detach();
263 }
264
265 pub fn sid(&self) -> TrackSid {
266 self.0.sid()
267 }
268
269 pub fn is_muted(&self) -> bool {
270 self.0.is_muted()
271 }
272}
273
274impl RemoteParticipant {
275 pub fn connection_quality(&self) -> ConnectionQuality {
276 connection_quality_from_livekit(self.0.connection_quality())
277 }
278
279 pub fn audio_level(&self) -> f32 {
280 self.0.audio_level()
281 }
282
283 pub fn identity(&self) -> ParticipantIdentity {
284 ParticipantIdentity(self.0.identity().0)
285 }
286
287 pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
288 self.0
289 .track_publications()
290 .into_iter()
291 .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
292 .collect()
293 }
294}
295
296impl RemoteAudioTrack {
297 pub fn sid(&self) -> TrackSid {
298 self.0.sid()
299 }
300}
301
302impl RemoteVideoTrack {
303 pub fn sid(&self) -> TrackSid {
304 self.0.sid()
305 }
306}
307
308impl RemoteTrackPublication {
309 pub fn is_muted(&self) -> bool {
310 self.0.is_muted()
311 }
312
313 pub fn is_enabled(&self) -> bool {
314 self.0.is_enabled()
315 }
316
317 pub fn track(&self) -> Option<RemoteTrack> {
318 self.0.track().map(remote_track_from_livekit)
319 }
320
321 pub fn is_audio(&self) -> bool {
322 self.0.kind() == livekit::track::TrackKind::Audio
323 }
324
325 pub fn set_enabled(&self, enabled: bool, cx: &App) {
326 let track = self.0.clone();
327 Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
328 }
329
330 pub fn sid(&self) -> TrackSid {
331 self.0.sid()
332 }
333}
334
335impl Participant {
336 pub fn identity(&self) -> ParticipantIdentity {
337 match self {
338 Participant::Local(local_participant) => {
339 ParticipantIdentity(local_participant.0.identity().0)
340 }
341 Participant::Remote(remote_participant) => {
342 ParticipantIdentity(remote_participant.0.identity().0)
343 }
344 }
345 }
346
347 pub fn connection_quality(&self) -> ConnectionQuality {
348 match self {
349 Participant::Local(local_participant) => local_participant.connection_quality(),
350 Participant::Remote(remote_participant) => remote_participant.connection_quality(),
351 }
352 }
353
354 pub fn audio_level(&self) -> f32 {
355 match self {
356 Participant::Local(local_participant) => local_participant.audio_level(),
357 Participant::Remote(remote_participant) => remote_participant.audio_level(),
358 }
359 }
360}
361
362fn connection_quality_from_livekit(
363 quality: livekit::prelude::ConnectionQuality,
364) -> ConnectionQuality {
365 match quality {
366 livekit::prelude::ConnectionQuality::Excellent => ConnectionQuality::Excellent,
367 livekit::prelude::ConnectionQuality::Good => ConnectionQuality::Good,
368 livekit::prelude::ConnectionQuality::Poor => ConnectionQuality::Poor,
369 livekit::prelude::ConnectionQuality::Lost => ConnectionQuality::Lost,
370 }
371}
372
373fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
374 match participant {
375 livekit::participant::Participant::Local(local) => {
376 Participant::Local(LocalParticipant(local))
377 }
378 livekit::participant::Participant::Remote(remote) => {
379 Participant::Remote(RemoteParticipant(remote))
380 }
381 }
382}
383
384fn publication_from_livekit(
385 publication: livekit::publication::TrackPublication,
386) -> TrackPublication {
387 match publication {
388 livekit::publication::TrackPublication::Local(local) => {
389 TrackPublication::Local(LocalTrackPublication(local))
390 }
391 livekit::publication::TrackPublication::Remote(remote) => {
392 TrackPublication::Remote(RemoteTrackPublication(remote))
393 }
394 }
395}
396
397fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
398 match track {
399 livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
400 livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
401 }
402}
403
404fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
405 match track {
406 livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
407 livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
408 }
409}
410fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
411 let event = match event {
412 livekit::RoomEvent::ParticipantConnected(remote_participant) => {
413 RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
414 }
415 livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
416 RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
417 }
418 livekit::RoomEvent::LocalTrackPublished {
419 publication,
420 track,
421 participant,
422 } => RoomEvent::LocalTrackPublished {
423 publication: LocalTrackPublication(publication),
424 track: local_track_from_livekit(track),
425 participant: LocalParticipant(participant),
426 },
427 livekit::RoomEvent::LocalTrackUnpublished {
428 publication,
429 participant,
430 } => RoomEvent::LocalTrackUnpublished {
431 publication: LocalTrackPublication(publication),
432 participant: LocalParticipant(participant),
433 },
434 livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
435 track: local_track_from_livekit(track),
436 },
437 livekit::RoomEvent::TrackSubscribed {
438 track,
439 publication,
440 participant,
441 } => RoomEvent::TrackSubscribed {
442 track: remote_track_from_livekit(track),
443 publication: RemoteTrackPublication(publication),
444 participant: RemoteParticipant(participant),
445 },
446 livekit::RoomEvent::TrackUnsubscribed {
447 track,
448 publication,
449 participant,
450 } => RoomEvent::TrackUnsubscribed {
451 track: remote_track_from_livekit(track),
452 publication: RemoteTrackPublication(publication),
453 participant: RemoteParticipant(participant),
454 },
455 livekit::RoomEvent::TrackSubscriptionFailed {
456 participant,
457 error: _,
458 track_sid,
459 } => RoomEvent::TrackSubscriptionFailed {
460 participant: RemoteParticipant(participant),
461 track_sid,
462 },
463 livekit::RoomEvent::TrackPublished {
464 publication,
465 participant,
466 } => RoomEvent::TrackPublished {
467 publication: RemoteTrackPublication(publication),
468 participant: RemoteParticipant(participant),
469 },
470 livekit::RoomEvent::TrackUnpublished {
471 publication,
472 participant,
473 } => RoomEvent::TrackUnpublished {
474 publication: RemoteTrackPublication(publication),
475 participant: RemoteParticipant(participant),
476 },
477 livekit::RoomEvent::TrackMuted {
478 participant,
479 publication,
480 } => RoomEvent::TrackMuted {
481 publication: publication_from_livekit(publication),
482 participant: participant_from_livekit(participant),
483 },
484 livekit::RoomEvent::TrackUnmuted {
485 participant,
486 publication,
487 } => RoomEvent::TrackUnmuted {
488 publication: publication_from_livekit(publication),
489 participant: participant_from_livekit(participant),
490 },
491 livekit::RoomEvent::RoomMetadataChanged {
492 old_metadata,
493 metadata,
494 } => RoomEvent::RoomMetadataChanged {
495 old_metadata,
496 metadata,
497 },
498 livekit::RoomEvent::ParticipantMetadataChanged {
499 participant,
500 old_metadata,
501 metadata,
502 } => RoomEvent::ParticipantMetadataChanged {
503 participant: participant_from_livekit(participant),
504 old_metadata,
505 metadata,
506 },
507 livekit::RoomEvent::ParticipantNameChanged {
508 participant,
509 old_name,
510 name,
511 } => RoomEvent::ParticipantNameChanged {
512 participant: participant_from_livekit(participant),
513 old_name,
514 name,
515 },
516 livekit::RoomEvent::ParticipantAttributesChanged {
517 participant,
518 changed_attributes,
519 } => RoomEvent::ParticipantAttributesChanged {
520 participant: participant_from_livekit(participant),
521 changed_attributes: changed_attributes.into_iter().collect(),
522 },
523 livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
524 RoomEvent::ActiveSpeakersChanged {
525 speakers: speakers.into_iter().map(participant_from_livekit).collect(),
526 }
527 }
528 livekit::RoomEvent::Connected {
529 participants_with_tracks,
530 } => RoomEvent::Connected {
531 participants_with_tracks: participants_with_tracks
532 .into_iter()
533 .map({
534 |(p, t)| {
535 (
536 RemoteParticipant(p),
537 t.into_iter().map(RemoteTrackPublication).collect(),
538 )
539 }
540 })
541 .collect(),
542 },
543 livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
544 reason: reason.as_str_name(),
545 },
546 livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
547 livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
548 livekit::RoomEvent::ConnectionQualityChanged {
549 quality,
550 participant,
551 } => RoomEvent::ConnectionQualityChanged {
552 participant: participant_from_livekit(participant),
553 quality: connection_quality_from_livekit(quality),
554 },
555 _ => {
556 log::trace!("dropping livekit event: {:?}", event);
557 return None;
558 }
559 };
560
561 Some(event)
562}