1use anyhow::{Context as _, Result, anyhow};
2use audio::AudioSettings;
3use collections::HashMap;
4use futures::{SinkExt, channel::mpsc};
5use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
6use gpui_tokio::Tokio;
7use log::info;
8use playback::capture_local_video_track;
9use settings::Settings;
10
11mod playback;
12
13use crate::{
14 LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication,
15 livekit_client::playback::Speaker,
16};
17pub use playback::AudioStream;
18pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
19
20#[derive(Clone, Debug)]
21pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
22#[derive(Clone, Debug)]
23pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
24#[derive(Clone, Debug)]
25pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
26#[derive(Clone, Debug)]
27pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
28
29#[derive(Clone, Debug)]
30pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
31#[derive(Clone, Debug)]
32pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
33#[derive(Clone, Debug)]
34pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
35#[derive(Clone, Debug)]
36pub struct LocalParticipant(livekit::participant::LocalParticipant);
37
38pub struct Room {
39 room: livekit::Room,
40 _task: Task<()>,
41 playback: playback::AudioStack,
42}
43
44pub type TrackSid = livekit::id::TrackSid;
45pub type ConnectionState = livekit::ConnectionState;
46#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
47pub struct ParticipantIdentity(pub String);
48
49impl Room {
50 pub async fn connect(
51 url: String,
52 token: String,
53 cx: &mut AsyncApp,
54 ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
55 let mut config = livekit::RoomOptions::default();
56 config.tls_config = livekit::TlsConfig(Some(http_client_tls::tls_config()));
57 let (room, mut events) = Tokio::spawn(cx, async move {
58 livekit::Room::connect(&url, &token, config).await
59 })
60 .await??;
61
62 let (mut tx, rx) = mpsc::unbounded();
63 let task = cx.background_executor().spawn(async move {
64 while let Some(event) = events.recv().await {
65 if let Some(event) = room_event_from_livekit(event) {
66 tx.send(event).await.ok();
67 }
68 }
69 });
70
71 Ok((
72 Self {
73 room,
74 _task: task,
75 playback: playback::AudioStack::new(cx.background_executor().clone()),
76 },
77 rx,
78 ))
79 }
80
81 pub fn local_participant(&self) -> LocalParticipant {
82 LocalParticipant(self.room.local_participant())
83 }
84
85 pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
86 self.room
87 .remote_participants()
88 .into_iter()
89 .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
90 .collect()
91 }
92
93 pub fn connection_state(&self) -> ConnectionState {
94 self.room.connection_state()
95 }
96
97 pub fn name(&self) -> String {
98 self.room.name()
99 }
100
101 pub async fn sid(&self) -> String {
102 self.room.sid().await.to_string()
103 }
104
105 pub async fn publish_local_microphone_track(
106 &self,
107 user_name: String,
108 is_staff: bool,
109 cx: &mut AsyncApp,
110 ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
111 let (track, stream) = self
112 .playback
113 .capture_local_microphone_track(user_name, is_staff, &cx)?;
114 let publication = self
115 .local_participant()
116 .publish_track(
117 livekit::track::LocalTrack::Audio(track.0),
118 livekit::options::TrackPublishOptions {
119 source: livekit::track::TrackSource::Microphone,
120 ..Default::default()
121 },
122 cx,
123 )
124 .await?;
125
126 Ok((publication, stream))
127 }
128
129 pub async fn unpublish_local_track(
130 &self,
131 sid: TrackSid,
132 cx: &mut AsyncApp,
133 ) -> Result<LocalTrackPublication> {
134 self.local_participant().unpublish_track(sid, cx).await
135 }
136
137 pub fn play_remote_audio_track(
138 &self,
139 track: &RemoteAudioTrack,
140 cx: &mut App,
141 ) -> Result<playback::AudioStream> {
142 let speaker: Speaker =
143 serde_urlencoded::from_str(&track.0.name()).unwrap_or_else(|_| Speaker {
144 name: track.0.name(),
145 is_staff: false,
146 sends_legacy_audio: true,
147 });
148
149 if AudioSettings::get_global(cx).rodio_audio {
150 info!("Using experimental.rodio_audio audio pipeline for output");
151 playback::play_remote_audio_track(&track.0, speaker, cx)
152 } else if speaker.sends_legacy_audio {
153 let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
154 Ok(self
155 .playback
156 .play_remote_audio_track(&track.0, output_audio_device))
157 } else {
158 Err(anyhow!("Client version too old to play audio in call"))
159 }
160 }
161}
162
163impl LocalParticipant {
164 pub async fn publish_screenshare_track(
165 &self,
166 source: &dyn ScreenCaptureSource,
167 cx: &mut AsyncApp,
168 ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
169 let (track, stream) = capture_local_video_track(source, cx).await?;
170 let options = livekit::options::TrackPublishOptions {
171 source: livekit::track::TrackSource::Screenshare,
172 video_codec: livekit::options::VideoCodec::VP8,
173 ..Default::default()
174 };
175 let publication = self
176 .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
177 .await?;
178
179 Ok((publication, stream))
180 }
181
182 async fn publish_track(
183 &self,
184 track: livekit::track::LocalTrack,
185 options: livekit::options::TrackPublishOptions,
186 cx: &mut AsyncApp,
187 ) -> Result<LocalTrackPublication> {
188 let participant = self.0.clone();
189 Tokio::spawn(cx, async move {
190 participant.publish_track(track, options).await
191 })
192 .await?
193 .map(LocalTrackPublication)
194 .context("publishing a track")
195 }
196
197 pub async fn unpublish_track(
198 &self,
199 sid: TrackSid,
200 cx: &mut AsyncApp,
201 ) -> Result<LocalTrackPublication> {
202 let participant = self.0.clone();
203 Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })
204 .await?
205 .map(LocalTrackPublication)
206 .context("unpublishing a track")
207 }
208}
209
210impl LocalTrackPublication {
211 pub fn mute(&self, cx: &App) {
212 let track = self.0.clone();
213 Tokio::spawn(cx, async move {
214 track.mute();
215 })
216 .detach();
217 }
218
219 pub fn unmute(&self, cx: &App) {
220 let track = self.0.clone();
221 Tokio::spawn(cx, async move {
222 track.unmute();
223 })
224 .detach();
225 }
226
227 pub fn sid(&self) -> TrackSid {
228 self.0.sid()
229 }
230
231 pub fn is_muted(&self) -> bool {
232 self.0.is_muted()
233 }
234}
235
236impl RemoteParticipant {
237 pub fn identity(&self) -> ParticipantIdentity {
238 ParticipantIdentity(self.0.identity().0)
239 }
240
241 pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
242 self.0
243 .track_publications()
244 .into_iter()
245 .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
246 .collect()
247 }
248}
249
250impl RemoteAudioTrack {
251 pub fn sid(&self) -> TrackSid {
252 self.0.sid()
253 }
254}
255
256impl RemoteVideoTrack {
257 pub fn sid(&self) -> TrackSid {
258 self.0.sid()
259 }
260}
261
262impl RemoteTrackPublication {
263 pub fn is_muted(&self) -> bool {
264 self.0.is_muted()
265 }
266
267 pub fn is_enabled(&self) -> bool {
268 self.0.is_enabled()
269 }
270
271 pub fn track(&self) -> Option<RemoteTrack> {
272 self.0.track().map(remote_track_from_livekit)
273 }
274
275 pub fn is_audio(&self) -> bool {
276 self.0.kind() == livekit::track::TrackKind::Audio
277 }
278
279 pub fn set_enabled(&self, enabled: bool, cx: &App) {
280 let track = self.0.clone();
281 Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
282 }
283
284 pub fn sid(&self) -> TrackSid {
285 self.0.sid()
286 }
287}
288
289impl Participant {
290 pub fn identity(&self) -> ParticipantIdentity {
291 match self {
292 Participant::Local(local_participant) => {
293 ParticipantIdentity(local_participant.0.identity().0)
294 }
295 Participant::Remote(remote_participant) => {
296 ParticipantIdentity(remote_participant.0.identity().0)
297 }
298 }
299 }
300}
301
302fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
303 match participant {
304 livekit::participant::Participant::Local(local) => {
305 Participant::Local(LocalParticipant(local))
306 }
307 livekit::participant::Participant::Remote(remote) => {
308 Participant::Remote(RemoteParticipant(remote))
309 }
310 }
311}
312
313fn publication_from_livekit(
314 publication: livekit::publication::TrackPublication,
315) -> TrackPublication {
316 match publication {
317 livekit::publication::TrackPublication::Local(local) => {
318 TrackPublication::Local(LocalTrackPublication(local))
319 }
320 livekit::publication::TrackPublication::Remote(remote) => {
321 TrackPublication::Remote(RemoteTrackPublication(remote))
322 }
323 }
324}
325
326fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
327 match track {
328 livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
329 livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
330 }
331}
332
333fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
334 match track {
335 livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
336 livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
337 }
338}
339fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
340 let event = match event {
341 livekit::RoomEvent::ParticipantConnected(remote_participant) => {
342 RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
343 }
344 livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
345 RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
346 }
347 livekit::RoomEvent::LocalTrackPublished {
348 publication,
349 track,
350 participant,
351 } => RoomEvent::LocalTrackPublished {
352 publication: LocalTrackPublication(publication),
353 track: local_track_from_livekit(track),
354 participant: LocalParticipant(participant),
355 },
356 livekit::RoomEvent::LocalTrackUnpublished {
357 publication,
358 participant,
359 } => RoomEvent::LocalTrackUnpublished {
360 publication: LocalTrackPublication(publication),
361 participant: LocalParticipant(participant),
362 },
363 livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
364 track: local_track_from_livekit(track),
365 },
366 livekit::RoomEvent::TrackSubscribed {
367 track,
368 publication,
369 participant,
370 } => RoomEvent::TrackSubscribed {
371 track: remote_track_from_livekit(track),
372 publication: RemoteTrackPublication(publication),
373 participant: RemoteParticipant(participant),
374 },
375 livekit::RoomEvent::TrackUnsubscribed {
376 track,
377 publication,
378 participant,
379 } => RoomEvent::TrackUnsubscribed {
380 track: remote_track_from_livekit(track),
381 publication: RemoteTrackPublication(publication),
382 participant: RemoteParticipant(participant),
383 },
384 livekit::RoomEvent::TrackSubscriptionFailed {
385 participant,
386 error: _,
387 track_sid,
388 } => RoomEvent::TrackSubscriptionFailed {
389 participant: RemoteParticipant(participant),
390 track_sid,
391 },
392 livekit::RoomEvent::TrackPublished {
393 publication,
394 participant,
395 } => RoomEvent::TrackPublished {
396 publication: RemoteTrackPublication(publication),
397 participant: RemoteParticipant(participant),
398 },
399 livekit::RoomEvent::TrackUnpublished {
400 publication,
401 participant,
402 } => RoomEvent::TrackUnpublished {
403 publication: RemoteTrackPublication(publication),
404 participant: RemoteParticipant(participant),
405 },
406 livekit::RoomEvent::TrackMuted {
407 participant,
408 publication,
409 } => RoomEvent::TrackMuted {
410 publication: publication_from_livekit(publication),
411 participant: participant_from_livekit(participant),
412 },
413 livekit::RoomEvent::TrackUnmuted {
414 participant,
415 publication,
416 } => RoomEvent::TrackUnmuted {
417 publication: publication_from_livekit(publication),
418 participant: participant_from_livekit(participant),
419 },
420 livekit::RoomEvent::RoomMetadataChanged {
421 old_metadata,
422 metadata,
423 } => RoomEvent::RoomMetadataChanged {
424 old_metadata,
425 metadata,
426 },
427 livekit::RoomEvent::ParticipantMetadataChanged {
428 participant,
429 old_metadata,
430 metadata,
431 } => RoomEvent::ParticipantMetadataChanged {
432 participant: participant_from_livekit(participant),
433 old_metadata,
434 metadata,
435 },
436 livekit::RoomEvent::ParticipantNameChanged {
437 participant,
438 old_name,
439 name,
440 } => RoomEvent::ParticipantNameChanged {
441 participant: participant_from_livekit(participant),
442 old_name,
443 name,
444 },
445 livekit::RoomEvent::ParticipantAttributesChanged {
446 participant,
447 changed_attributes,
448 } => RoomEvent::ParticipantAttributesChanged {
449 participant: participant_from_livekit(participant),
450 changed_attributes: changed_attributes.into_iter().collect(),
451 },
452 livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
453 RoomEvent::ActiveSpeakersChanged {
454 speakers: speakers.into_iter().map(participant_from_livekit).collect(),
455 }
456 }
457 livekit::RoomEvent::Connected {
458 participants_with_tracks,
459 } => RoomEvent::Connected {
460 participants_with_tracks: participants_with_tracks
461 .into_iter()
462 .map({
463 |(p, t)| {
464 (
465 RemoteParticipant(p),
466 t.into_iter().map(RemoteTrackPublication).collect(),
467 )
468 }
469 })
470 .collect(),
471 },
472 livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
473 reason: reason.as_str_name(),
474 },
475 livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
476 livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
477 _ => {
478 log::trace!("dropping livekit event: {:?}", event);
479 return None;
480 }
481 };
482
483 Some(event)
484}