1use std::sync::Arc;
2
3use anyhow::{Context as _, Result, anyhow};
4use audio::AudioSettings;
5use collections::HashMap;
6use futures::{SinkExt, channel::mpsc};
7use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
8use gpui_tokio::Tokio;
9use log::info;
10use playback::capture_local_video_track;
11use settings::Settings;
12
13mod playback;
14
15use crate::{
16 LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication,
17 livekit_client::playback::Speaker,
18};
19pub use playback::AudioStream;
20pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
21
22#[derive(Clone, Debug)]
23pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
24#[derive(Clone, Debug)]
25pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
26#[derive(Clone, Debug)]
27pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
28#[derive(Clone, Debug)]
29pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
30
31#[derive(Clone, Debug)]
32pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
33#[derive(Clone, Debug)]
34pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
35#[derive(Clone, Debug)]
36pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
37#[derive(Clone, Debug)]
38pub struct LocalParticipant(livekit::participant::LocalParticipant);
39
40pub struct Room {
41 room: livekit::Room,
42 _task: Task<()>,
43 playback: playback::AudioStack,
44}
45
46pub type TrackSid = livekit::id::TrackSid;
47pub type ConnectionState = livekit::ConnectionState;
48#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
49pub struct ParticipantIdentity(pub String);
50
51impl Room {
52 pub async fn connect(
53 url: String,
54 token: String,
55 cx: &mut AsyncApp,
56 ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
57 let connector =
58 tokio_tungstenite::Connector::Rustls(Arc::new(http_client_tls::tls_config()));
59 let mut config = livekit::RoomOptions::default();
60 config.connector = Some(connector);
61 let (room, mut events) = Tokio::spawn(cx, async move {
62 livekit::Room::connect(&url, &token, config).await
63 })?
64 .await??;
65
66 let (mut tx, rx) = mpsc::unbounded();
67 let task = cx.background_executor().spawn(async move {
68 while let Some(event) = events.recv().await {
69 if let Some(event) = room_event_from_livekit(event) {
70 tx.send(event).await.ok();
71 }
72 }
73 });
74
75 Ok((
76 Self {
77 room,
78 _task: task,
79 playback: playback::AudioStack::new(cx.background_executor().clone()),
80 },
81 rx,
82 ))
83 }
84
85 pub fn local_participant(&self) -> LocalParticipant {
86 LocalParticipant(self.room.local_participant())
87 }
88
89 pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
90 self.room
91 .remote_participants()
92 .into_iter()
93 .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
94 .collect()
95 }
96
97 pub fn connection_state(&self) -> ConnectionState {
98 self.room.connection_state()
99 }
100
101 pub async fn publish_local_microphone_track(
102 &self,
103 user_name: String,
104 is_staff: bool,
105 cx: &mut AsyncApp,
106 ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
107 let (track, stream) = self
108 .playback
109 .capture_local_microphone_track(user_name, is_staff, &cx)?;
110 let publication = self
111 .local_participant()
112 .publish_track(
113 livekit::track::LocalTrack::Audio(track.0),
114 livekit::options::TrackPublishOptions {
115 source: livekit::track::TrackSource::Microphone,
116 ..Default::default()
117 },
118 cx,
119 )
120 .await?;
121
122 Ok((publication, stream))
123 }
124
125 pub async fn unpublish_local_track(
126 &self,
127 sid: TrackSid,
128 cx: &mut AsyncApp,
129 ) -> Result<LocalTrackPublication> {
130 self.local_participant().unpublish_track(sid, cx).await
131 }
132
133 pub fn play_remote_audio_track(
134 &self,
135 track: &RemoteAudioTrack,
136 cx: &mut App,
137 ) -> Result<playback::AudioStream> {
138 let speaker: Speaker =
139 serde_urlencoded::from_str(&track.0.name()).unwrap_or_else(|_| Speaker {
140 name: track.0.name(),
141 is_staff: false,
142 sends_legacy_audio: true,
143 });
144
145 if AudioSettings::get_global(cx).rodio_audio {
146 info!("Using experimental.rodio_audio audio pipeline for output");
147 playback::play_remote_audio_track(&track.0, speaker, cx)
148 } else if speaker.sends_legacy_audio {
149 Ok(self.playback.play_remote_audio_track(&track.0))
150 } else {
151 Err(anyhow!("Client version too old to play audio in call"))
152 }
153 }
154}
155
156impl LocalParticipant {
157 pub async fn publish_screenshare_track(
158 &self,
159 source: &dyn ScreenCaptureSource,
160 cx: &mut AsyncApp,
161 ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
162 let (track, stream) = capture_local_video_track(source, cx).await?;
163 let options = livekit::options::TrackPublishOptions {
164 source: livekit::track::TrackSource::Screenshare,
165 video_codec: livekit::options::VideoCodec::VP8,
166 ..Default::default()
167 };
168 let publication = self
169 .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
170 .await?;
171
172 Ok((publication, stream))
173 }
174
175 async fn publish_track(
176 &self,
177 track: livekit::track::LocalTrack,
178 options: livekit::options::TrackPublishOptions,
179 cx: &mut AsyncApp,
180 ) -> Result<LocalTrackPublication> {
181 let participant = self.0.clone();
182 Tokio::spawn(cx, async move {
183 participant.publish_track(track, options).await
184 })?
185 .await?
186 .map(LocalTrackPublication)
187 .context("publishing a track")
188 }
189
190 pub async fn unpublish_track(
191 &self,
192 sid: TrackSid,
193 cx: &mut AsyncApp,
194 ) -> Result<LocalTrackPublication> {
195 let participant = self.0.clone();
196 Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })?
197 .await?
198 .map(LocalTrackPublication)
199 .context("unpublishing a track")
200 }
201}
202
203impl LocalTrackPublication {
204 pub fn mute(&self, cx: &App) {
205 let track = self.0.clone();
206 Tokio::spawn(cx, async move {
207 track.mute();
208 })
209 .detach();
210 }
211
212 pub fn unmute(&self, cx: &App) {
213 let track = self.0.clone();
214 Tokio::spawn(cx, async move {
215 track.unmute();
216 })
217 .detach();
218 }
219
220 pub fn sid(&self) -> TrackSid {
221 self.0.sid()
222 }
223
224 pub fn is_muted(&self) -> bool {
225 self.0.is_muted()
226 }
227}
228
229impl RemoteParticipant {
230 pub fn identity(&self) -> ParticipantIdentity {
231 ParticipantIdentity(self.0.identity().0)
232 }
233
234 pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
235 self.0
236 .track_publications()
237 .into_iter()
238 .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
239 .collect()
240 }
241}
242
243impl RemoteAudioTrack {
244 pub fn sid(&self) -> TrackSid {
245 self.0.sid()
246 }
247}
248
249impl RemoteVideoTrack {
250 pub fn sid(&self) -> TrackSid {
251 self.0.sid()
252 }
253}
254
255impl RemoteTrackPublication {
256 pub fn is_muted(&self) -> bool {
257 self.0.is_muted()
258 }
259
260 pub fn is_enabled(&self) -> bool {
261 self.0.is_enabled()
262 }
263
264 pub fn track(&self) -> Option<RemoteTrack> {
265 self.0.track().map(remote_track_from_livekit)
266 }
267
268 pub fn is_audio(&self) -> bool {
269 self.0.kind() == livekit::track::TrackKind::Audio
270 }
271
272 pub fn set_enabled(&self, enabled: bool, cx: &App) {
273 let track = self.0.clone();
274 Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
275 }
276
277 pub fn sid(&self) -> TrackSid {
278 self.0.sid()
279 }
280}
281
282impl Participant {
283 pub fn identity(&self) -> ParticipantIdentity {
284 match self {
285 Participant::Local(local_participant) => {
286 ParticipantIdentity(local_participant.0.identity().0)
287 }
288 Participant::Remote(remote_participant) => {
289 ParticipantIdentity(remote_participant.0.identity().0)
290 }
291 }
292 }
293}
294
295fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
296 match participant {
297 livekit::participant::Participant::Local(local) => {
298 Participant::Local(LocalParticipant(local))
299 }
300 livekit::participant::Participant::Remote(remote) => {
301 Participant::Remote(RemoteParticipant(remote))
302 }
303 }
304}
305
306fn publication_from_livekit(
307 publication: livekit::publication::TrackPublication,
308) -> TrackPublication {
309 match publication {
310 livekit::publication::TrackPublication::Local(local) => {
311 TrackPublication::Local(LocalTrackPublication(local))
312 }
313 livekit::publication::TrackPublication::Remote(remote) => {
314 TrackPublication::Remote(RemoteTrackPublication(remote))
315 }
316 }
317}
318
319fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
320 match track {
321 livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
322 livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
323 }
324}
325
326fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
327 match track {
328 livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
329 livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
330 }
331}
332fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
333 let event = match event {
334 livekit::RoomEvent::ParticipantConnected(remote_participant) => {
335 RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
336 }
337 livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
338 RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
339 }
340 livekit::RoomEvent::LocalTrackPublished {
341 publication,
342 track,
343 participant,
344 } => RoomEvent::LocalTrackPublished {
345 publication: LocalTrackPublication(publication),
346 track: local_track_from_livekit(track),
347 participant: LocalParticipant(participant),
348 },
349 livekit::RoomEvent::LocalTrackUnpublished {
350 publication,
351 participant,
352 } => RoomEvent::LocalTrackUnpublished {
353 publication: LocalTrackPublication(publication),
354 participant: LocalParticipant(participant),
355 },
356 livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
357 track: local_track_from_livekit(track),
358 },
359 livekit::RoomEvent::TrackSubscribed {
360 track,
361 publication,
362 participant,
363 } => RoomEvent::TrackSubscribed {
364 track: remote_track_from_livekit(track),
365 publication: RemoteTrackPublication(publication),
366 participant: RemoteParticipant(participant),
367 },
368 livekit::RoomEvent::TrackUnsubscribed {
369 track,
370 publication,
371 participant,
372 } => RoomEvent::TrackUnsubscribed {
373 track: remote_track_from_livekit(track),
374 publication: RemoteTrackPublication(publication),
375 participant: RemoteParticipant(participant),
376 },
377 livekit::RoomEvent::TrackSubscriptionFailed {
378 participant,
379 error: _,
380 track_sid,
381 } => RoomEvent::TrackSubscriptionFailed {
382 participant: RemoteParticipant(participant),
383 track_sid,
384 },
385 livekit::RoomEvent::TrackPublished {
386 publication,
387 participant,
388 } => RoomEvent::TrackPublished {
389 publication: RemoteTrackPublication(publication),
390 participant: RemoteParticipant(participant),
391 },
392 livekit::RoomEvent::TrackUnpublished {
393 publication,
394 participant,
395 } => RoomEvent::TrackUnpublished {
396 publication: RemoteTrackPublication(publication),
397 participant: RemoteParticipant(participant),
398 },
399 livekit::RoomEvent::TrackMuted {
400 participant,
401 publication,
402 } => RoomEvent::TrackMuted {
403 publication: publication_from_livekit(publication),
404 participant: participant_from_livekit(participant),
405 },
406 livekit::RoomEvent::TrackUnmuted {
407 participant,
408 publication,
409 } => RoomEvent::TrackUnmuted {
410 publication: publication_from_livekit(publication),
411 participant: participant_from_livekit(participant),
412 },
413 livekit::RoomEvent::RoomMetadataChanged {
414 old_metadata,
415 metadata,
416 } => RoomEvent::RoomMetadataChanged {
417 old_metadata,
418 metadata,
419 },
420 livekit::RoomEvent::ParticipantMetadataChanged {
421 participant,
422 old_metadata,
423 metadata,
424 } => RoomEvent::ParticipantMetadataChanged {
425 participant: participant_from_livekit(participant),
426 old_metadata,
427 metadata,
428 },
429 livekit::RoomEvent::ParticipantNameChanged {
430 participant,
431 old_name,
432 name,
433 } => RoomEvent::ParticipantNameChanged {
434 participant: participant_from_livekit(participant),
435 old_name,
436 name,
437 },
438 livekit::RoomEvent::ParticipantAttributesChanged {
439 participant,
440 changed_attributes,
441 } => RoomEvent::ParticipantAttributesChanged {
442 participant: participant_from_livekit(participant),
443 changed_attributes: changed_attributes.into_iter().collect(),
444 },
445 livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
446 RoomEvent::ActiveSpeakersChanged {
447 speakers: speakers.into_iter().map(participant_from_livekit).collect(),
448 }
449 }
450 livekit::RoomEvent::Connected {
451 participants_with_tracks,
452 } => RoomEvent::Connected {
453 participants_with_tracks: participants_with_tracks
454 .into_iter()
455 .map({
456 |(p, t)| {
457 (
458 RemoteParticipant(p),
459 t.into_iter().map(RemoteTrackPublication).collect(),
460 )
461 }
462 })
463 .collect(),
464 },
465 livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
466 reason: reason.as_str_name(),
467 },
468 livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
469 livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
470 _ => {
471 log::trace!("dropping livekit event: {:?}", event);
472 return None;
473 }
474 };
475
476 Some(event)
477}