1use std::sync::Arc;
2
3use anyhow::{Context as _, Result, anyhow};
4use audio::AudioSettings;
5use collections::HashMap;
6use futures::{SinkExt, channel::mpsc};
7use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
8use gpui_tokio::Tokio;
9use log::info;
10use playback::capture_local_video_track;
11use settings::Settings;
12
13mod playback;
14
15use crate::{
16 LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication,
17 livekit_client::playback::Speaker,
18};
19pub use playback::AudioStream;
20pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
21
22#[derive(Clone, Debug)]
23pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
24#[derive(Clone, Debug)]
25pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
26#[derive(Clone, Debug)]
27pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
28#[derive(Clone, Debug)]
29pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
30
31#[derive(Clone, Debug)]
32pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
33#[derive(Clone, Debug)]
34pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
35#[derive(Clone, Debug)]
36pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
37#[derive(Clone, Debug)]
38pub struct LocalParticipant(livekit::participant::LocalParticipant);
39
40pub struct Room {
41 room: livekit::Room,
42 _task: Task<()>,
43 playback: playback::AudioStack,
44}
45
46pub type TrackSid = livekit::id::TrackSid;
47pub type ConnectionState = livekit::ConnectionState;
48#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
49pub struct ParticipantIdentity(pub String);
50
51impl Room {
52 pub async fn connect(
53 url: String,
54 token: String,
55 cx: &mut AsyncApp,
56 ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
57 let connector =
58 tokio_tungstenite::Connector::Rustls(Arc::new(http_client_tls::tls_config()));
59 let mut config = livekit::RoomOptions::default();
60 config.connector = Some(connector);
61 let (room, mut events) = Tokio::spawn(cx, async move {
62 livekit::Room::connect(&url, &token, config).await
63 })?
64 .await??;
65
66 let (mut tx, rx) = mpsc::unbounded();
67 let task = cx.background_executor().spawn(async move {
68 while let Some(event) = events.recv().await {
69 if let Some(event) = room_event_from_livekit(event) {
70 tx.send(event).await.ok();
71 }
72 }
73 });
74
75 Ok((
76 Self {
77 room,
78 _task: task,
79 playback: playback::AudioStack::new(cx.background_executor().clone()),
80 },
81 rx,
82 ))
83 }
84
85 pub fn local_participant(&self) -> LocalParticipant {
86 LocalParticipant(self.room.local_participant())
87 }
88
89 pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
90 self.room
91 .remote_participants()
92 .into_iter()
93 .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
94 .collect()
95 }
96
97 pub fn connection_state(&self) -> ConnectionState {
98 self.room.connection_state()
99 }
100
101 pub fn name(&self) -> String {
102 self.room.name()
103 }
104
105 pub async fn sid(&self) -> String {
106 self.room.sid().await.to_string()
107 }
108
109 pub async fn publish_local_microphone_track(
110 &self,
111 user_name: String,
112 is_staff: bool,
113 cx: &mut AsyncApp,
114 ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
115 let (track, stream) = self
116 .playback
117 .capture_local_microphone_track(user_name, is_staff, &cx)?;
118 let publication = self
119 .local_participant()
120 .publish_track(
121 livekit::track::LocalTrack::Audio(track.0),
122 livekit::options::TrackPublishOptions {
123 source: livekit::track::TrackSource::Microphone,
124 ..Default::default()
125 },
126 cx,
127 )
128 .await?;
129
130 Ok((publication, stream))
131 }
132
133 pub async fn unpublish_local_track(
134 &self,
135 sid: TrackSid,
136 cx: &mut AsyncApp,
137 ) -> Result<LocalTrackPublication> {
138 self.local_participant().unpublish_track(sid, cx).await
139 }
140
141 pub fn play_remote_audio_track(
142 &self,
143 track: &RemoteAudioTrack,
144 cx: &mut App,
145 ) -> Result<playback::AudioStream> {
146 let speaker: Speaker =
147 serde_urlencoded::from_str(&track.0.name()).unwrap_or_else(|_| Speaker {
148 name: track.0.name(),
149 is_staff: false,
150 sends_legacy_audio: true,
151 });
152
153 if AudioSettings::get_global(cx).rodio_audio {
154 info!("Using experimental.rodio_audio audio pipeline for output");
155 playback::play_remote_audio_track(&track.0, speaker, cx)
156 } else if speaker.sends_legacy_audio {
157 Ok(self.playback.play_remote_audio_track(&track.0))
158 } else {
159 Err(anyhow!("Client version too old to play audio in call"))
160 }
161 }
162}
163
164impl LocalParticipant {
165 pub async fn publish_screenshare_track(
166 &self,
167 source: &dyn ScreenCaptureSource,
168 cx: &mut AsyncApp,
169 ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
170 let (track, stream) = capture_local_video_track(source, cx).await?;
171 let options = livekit::options::TrackPublishOptions {
172 source: livekit::track::TrackSource::Screenshare,
173 video_codec: livekit::options::VideoCodec::VP8,
174 ..Default::default()
175 };
176 let publication = self
177 .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
178 .await?;
179
180 Ok((publication, stream))
181 }
182
183 async fn publish_track(
184 &self,
185 track: livekit::track::LocalTrack,
186 options: livekit::options::TrackPublishOptions,
187 cx: &mut AsyncApp,
188 ) -> Result<LocalTrackPublication> {
189 let participant = self.0.clone();
190 Tokio::spawn(cx, async move {
191 participant.publish_track(track, options).await
192 })?
193 .await?
194 .map(LocalTrackPublication)
195 .context("publishing a track")
196 }
197
198 pub async fn unpublish_track(
199 &self,
200 sid: TrackSid,
201 cx: &mut AsyncApp,
202 ) -> Result<LocalTrackPublication> {
203 let participant = self.0.clone();
204 Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })?
205 .await?
206 .map(LocalTrackPublication)
207 .context("unpublishing a track")
208 }
209}
210
211impl LocalTrackPublication {
212 pub fn mute(&self, cx: &App) {
213 let track = self.0.clone();
214 Tokio::spawn(cx, async move {
215 track.mute();
216 })
217 .detach();
218 }
219
220 pub fn unmute(&self, cx: &App) {
221 let track = self.0.clone();
222 Tokio::spawn(cx, async move {
223 track.unmute();
224 })
225 .detach();
226 }
227
228 pub fn sid(&self) -> TrackSid {
229 self.0.sid()
230 }
231
232 pub fn is_muted(&self) -> bool {
233 self.0.is_muted()
234 }
235}
236
237impl RemoteParticipant {
238 pub fn identity(&self) -> ParticipantIdentity {
239 ParticipantIdentity(self.0.identity().0)
240 }
241
242 pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
243 self.0
244 .track_publications()
245 .into_iter()
246 .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
247 .collect()
248 }
249}
250
251impl RemoteAudioTrack {
252 pub fn sid(&self) -> TrackSid {
253 self.0.sid()
254 }
255}
256
257impl RemoteVideoTrack {
258 pub fn sid(&self) -> TrackSid {
259 self.0.sid()
260 }
261}
262
263impl RemoteTrackPublication {
264 pub fn is_muted(&self) -> bool {
265 self.0.is_muted()
266 }
267
268 pub fn is_enabled(&self) -> bool {
269 self.0.is_enabled()
270 }
271
272 pub fn track(&self) -> Option<RemoteTrack> {
273 self.0.track().map(remote_track_from_livekit)
274 }
275
276 pub fn is_audio(&self) -> bool {
277 self.0.kind() == livekit::track::TrackKind::Audio
278 }
279
280 pub fn set_enabled(&self, enabled: bool, cx: &App) {
281 let track = self.0.clone();
282 Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
283 }
284
285 pub fn sid(&self) -> TrackSid {
286 self.0.sid()
287 }
288}
289
290impl Participant {
291 pub fn identity(&self) -> ParticipantIdentity {
292 match self {
293 Participant::Local(local_participant) => {
294 ParticipantIdentity(local_participant.0.identity().0)
295 }
296 Participant::Remote(remote_participant) => {
297 ParticipantIdentity(remote_participant.0.identity().0)
298 }
299 }
300 }
301}
302
303fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
304 match participant {
305 livekit::participant::Participant::Local(local) => {
306 Participant::Local(LocalParticipant(local))
307 }
308 livekit::participant::Participant::Remote(remote) => {
309 Participant::Remote(RemoteParticipant(remote))
310 }
311 }
312}
313
314fn publication_from_livekit(
315 publication: livekit::publication::TrackPublication,
316) -> TrackPublication {
317 match publication {
318 livekit::publication::TrackPublication::Local(local) => {
319 TrackPublication::Local(LocalTrackPublication(local))
320 }
321 livekit::publication::TrackPublication::Remote(remote) => {
322 TrackPublication::Remote(RemoteTrackPublication(remote))
323 }
324 }
325}
326
327fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
328 match track {
329 livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
330 livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
331 }
332}
333
334fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
335 match track {
336 livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
337 livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
338 }
339}
340fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
341 let event = match event {
342 livekit::RoomEvent::ParticipantConnected(remote_participant) => {
343 RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
344 }
345 livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
346 RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
347 }
348 livekit::RoomEvent::LocalTrackPublished {
349 publication,
350 track,
351 participant,
352 } => RoomEvent::LocalTrackPublished {
353 publication: LocalTrackPublication(publication),
354 track: local_track_from_livekit(track),
355 participant: LocalParticipant(participant),
356 },
357 livekit::RoomEvent::LocalTrackUnpublished {
358 publication,
359 participant,
360 } => RoomEvent::LocalTrackUnpublished {
361 publication: LocalTrackPublication(publication),
362 participant: LocalParticipant(participant),
363 },
364 livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
365 track: local_track_from_livekit(track),
366 },
367 livekit::RoomEvent::TrackSubscribed {
368 track,
369 publication,
370 participant,
371 } => RoomEvent::TrackSubscribed {
372 track: remote_track_from_livekit(track),
373 publication: RemoteTrackPublication(publication),
374 participant: RemoteParticipant(participant),
375 },
376 livekit::RoomEvent::TrackUnsubscribed {
377 track,
378 publication,
379 participant,
380 } => RoomEvent::TrackUnsubscribed {
381 track: remote_track_from_livekit(track),
382 publication: RemoteTrackPublication(publication),
383 participant: RemoteParticipant(participant),
384 },
385 livekit::RoomEvent::TrackSubscriptionFailed {
386 participant,
387 error: _,
388 track_sid,
389 } => RoomEvent::TrackSubscriptionFailed {
390 participant: RemoteParticipant(participant),
391 track_sid,
392 },
393 livekit::RoomEvent::TrackPublished {
394 publication,
395 participant,
396 } => RoomEvent::TrackPublished {
397 publication: RemoteTrackPublication(publication),
398 participant: RemoteParticipant(participant),
399 },
400 livekit::RoomEvent::TrackUnpublished {
401 publication,
402 participant,
403 } => RoomEvent::TrackUnpublished {
404 publication: RemoteTrackPublication(publication),
405 participant: RemoteParticipant(participant),
406 },
407 livekit::RoomEvent::TrackMuted {
408 participant,
409 publication,
410 } => RoomEvent::TrackMuted {
411 publication: publication_from_livekit(publication),
412 participant: participant_from_livekit(participant),
413 },
414 livekit::RoomEvent::TrackUnmuted {
415 participant,
416 publication,
417 } => RoomEvent::TrackUnmuted {
418 publication: publication_from_livekit(publication),
419 participant: participant_from_livekit(participant),
420 },
421 livekit::RoomEvent::RoomMetadataChanged {
422 old_metadata,
423 metadata,
424 } => RoomEvent::RoomMetadataChanged {
425 old_metadata,
426 metadata,
427 },
428 livekit::RoomEvent::ParticipantMetadataChanged {
429 participant,
430 old_metadata,
431 metadata,
432 } => RoomEvent::ParticipantMetadataChanged {
433 participant: participant_from_livekit(participant),
434 old_metadata,
435 metadata,
436 },
437 livekit::RoomEvent::ParticipantNameChanged {
438 participant,
439 old_name,
440 name,
441 } => RoomEvent::ParticipantNameChanged {
442 participant: participant_from_livekit(participant),
443 old_name,
444 name,
445 },
446 livekit::RoomEvent::ParticipantAttributesChanged {
447 participant,
448 changed_attributes,
449 } => RoomEvent::ParticipantAttributesChanged {
450 participant: participant_from_livekit(participant),
451 changed_attributes: changed_attributes.into_iter().collect(),
452 },
453 livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
454 RoomEvent::ActiveSpeakersChanged {
455 speakers: speakers.into_iter().map(participant_from_livekit).collect(),
456 }
457 }
458 livekit::RoomEvent::Connected {
459 participants_with_tracks,
460 } => RoomEvent::Connected {
461 participants_with_tracks: participants_with_tracks
462 .into_iter()
463 .map({
464 |(p, t)| {
465 (
466 RemoteParticipant(p),
467 t.into_iter().map(RemoteTrackPublication).collect(),
468 )
469 }
470 })
471 .collect(),
472 },
473 livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
474 reason: reason.as_str_name(),
475 },
476 livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
477 livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
478 _ => {
479 log::trace!("dropping livekit event: {:?}", event);
480 return None;
481 }
482 };
483
484 Some(event)
485}