1use anyhow::{Context as _, Result, anyhow};
2use audio::AudioSettings;
3use collections::HashMap;
4use futures::{SinkExt, channel::mpsc};
5use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
6use gpui_tokio::Tokio;
7use log::info;
8use playback::capture_local_video_track;
9use settings::Settings;
10
11mod playback;
12
13use crate::{
14 LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication,
15 livekit_client::playback::Speaker,
16};
17pub use playback::AudioStream;
18pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
19
20#[derive(Clone, Debug)]
21pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
22#[derive(Clone, Debug)]
23pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
24#[derive(Clone, Debug)]
25pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
26#[derive(Clone, Debug)]
27pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
28
29#[derive(Clone, Debug)]
30pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
31#[derive(Clone, Debug)]
32pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
33#[derive(Clone, Debug)]
34pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
35#[derive(Clone, Debug)]
36pub struct LocalParticipant(livekit::participant::LocalParticipant);
37
38pub struct Room {
39 room: livekit::Room,
40 _task: Task<()>,
41 playback: playback::AudioStack,
42}
43
44pub type TrackSid = livekit::id::TrackSid;
45pub type ConnectionState = livekit::ConnectionState;
46#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
47pub struct ParticipantIdentity(pub String);
48
49impl Room {
50 pub async fn connect(
51 url: String,
52 token: String,
53 cx: &mut AsyncApp,
54 ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
55 let mut config = livekit::RoomOptions::default();
56 config.tls_config = livekit::TlsConfig(Some(http_client_tls::tls_config()));
57 let (room, mut events) = Tokio::spawn(cx, async move {
58 livekit::Room::connect(&url, &token, config).await
59 })
60 .await??;
61
62 let (mut tx, rx) = mpsc::unbounded();
63 let task = cx.background_executor().spawn(async move {
64 while let Some(event) = events.recv().await {
65 if let Some(event) = room_event_from_livekit(event) {
66 tx.send(event).await.ok();
67 }
68 }
69 });
70
71 Ok((
72 Self {
73 room,
74 _task: task,
75 playback: playback::AudioStack::new(cx.background_executor().clone()),
76 },
77 rx,
78 ))
79 }
80
81 pub fn local_participant(&self) -> LocalParticipant {
82 LocalParticipant(self.room.local_participant())
83 }
84
85 pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
86 self.room
87 .remote_participants()
88 .into_iter()
89 .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
90 .collect()
91 }
92
93 pub fn connection_state(&self) -> ConnectionState {
94 self.room.connection_state()
95 }
96
97 pub fn name(&self) -> String {
98 self.room.name()
99 }
100
101 pub async fn sid(&self) -> String {
102 self.room.sid().await.to_string()
103 }
104
105 pub async fn publish_local_microphone_track(
106 &self,
107 user_name: String,
108 is_staff: bool,
109 cx: &mut AsyncApp,
110 ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
111 let (track, stream) = self
112 .playback
113 .capture_local_microphone_track(user_name, is_staff, &cx)?;
114 let publication = self
115 .local_participant()
116 .publish_track(
117 livekit::track::LocalTrack::Audio(track.0),
118 livekit::options::TrackPublishOptions {
119 source: livekit::track::TrackSource::Microphone,
120 ..Default::default()
121 },
122 cx,
123 )
124 .await?;
125
126 Ok((publication, stream))
127 }
128
129 pub async fn unpublish_local_track(
130 &self,
131 sid: TrackSid,
132 cx: &mut AsyncApp,
133 ) -> Result<LocalTrackPublication> {
134 self.local_participant().unpublish_track(sid, cx).await
135 }
136
137 pub fn play_remote_audio_track(
138 &self,
139 track: &RemoteAudioTrack,
140 cx: &mut App,
141 ) -> Result<playback::AudioStream> {
142 let speaker: Speaker =
143 serde_urlencoded::from_str(&track.0.name()).unwrap_or_else(|_| Speaker {
144 name: track.0.name(),
145 is_staff: false,
146 sends_legacy_audio: true,
147 });
148
149 if AudioSettings::get_global(cx).rodio_audio {
150 info!("Using experimental.rodio_audio audio pipeline for output");
151 playback::play_remote_audio_track(&track.0, speaker, cx)
152 } else if speaker.sends_legacy_audio {
153 Ok(self.playback.play_remote_audio_track(&track.0))
154 } else {
155 Err(anyhow!("Client version too old to play audio in call"))
156 }
157 }
158}
159
160impl LocalParticipant {
161 pub async fn publish_screenshare_track(
162 &self,
163 source: &dyn ScreenCaptureSource,
164 cx: &mut AsyncApp,
165 ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
166 let (track, stream) = capture_local_video_track(source, cx).await?;
167 let options = livekit::options::TrackPublishOptions {
168 source: livekit::track::TrackSource::Screenshare,
169 video_codec: livekit::options::VideoCodec::VP8,
170 ..Default::default()
171 };
172 let publication = self
173 .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
174 .await?;
175
176 Ok((publication, stream))
177 }
178
179 async fn publish_track(
180 &self,
181 track: livekit::track::LocalTrack,
182 options: livekit::options::TrackPublishOptions,
183 cx: &mut AsyncApp,
184 ) -> Result<LocalTrackPublication> {
185 let participant = self.0.clone();
186 Tokio::spawn(cx, async move {
187 participant.publish_track(track, options).await
188 })
189 .await?
190 .map(LocalTrackPublication)
191 .context("publishing a track")
192 }
193
194 pub async fn unpublish_track(
195 &self,
196 sid: TrackSid,
197 cx: &mut AsyncApp,
198 ) -> Result<LocalTrackPublication> {
199 let participant = self.0.clone();
200 Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })
201 .await?
202 .map(LocalTrackPublication)
203 .context("unpublishing a track")
204 }
205}
206
207impl LocalTrackPublication {
208 pub fn mute(&self, cx: &App) {
209 let track = self.0.clone();
210 Tokio::spawn(cx, async move {
211 track.mute();
212 })
213 .detach();
214 }
215
216 pub fn unmute(&self, cx: &App) {
217 let track = self.0.clone();
218 Tokio::spawn(cx, async move {
219 track.unmute();
220 })
221 .detach();
222 }
223
224 pub fn sid(&self) -> TrackSid {
225 self.0.sid()
226 }
227
228 pub fn is_muted(&self) -> bool {
229 self.0.is_muted()
230 }
231}
232
233impl RemoteParticipant {
234 pub fn identity(&self) -> ParticipantIdentity {
235 ParticipantIdentity(self.0.identity().0)
236 }
237
238 pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
239 self.0
240 .track_publications()
241 .into_iter()
242 .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
243 .collect()
244 }
245}
246
247impl RemoteAudioTrack {
248 pub fn sid(&self) -> TrackSid {
249 self.0.sid()
250 }
251}
252
253impl RemoteVideoTrack {
254 pub fn sid(&self) -> TrackSid {
255 self.0.sid()
256 }
257}
258
259impl RemoteTrackPublication {
260 pub fn is_muted(&self) -> bool {
261 self.0.is_muted()
262 }
263
264 pub fn is_enabled(&self) -> bool {
265 self.0.is_enabled()
266 }
267
268 pub fn track(&self) -> Option<RemoteTrack> {
269 self.0.track().map(remote_track_from_livekit)
270 }
271
272 pub fn is_audio(&self) -> bool {
273 self.0.kind() == livekit::track::TrackKind::Audio
274 }
275
276 pub fn set_enabled(&self, enabled: bool, cx: &App) {
277 let track = self.0.clone();
278 Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
279 }
280
281 pub fn sid(&self) -> TrackSid {
282 self.0.sid()
283 }
284}
285
286impl Participant {
287 pub fn identity(&self) -> ParticipantIdentity {
288 match self {
289 Participant::Local(local_participant) => {
290 ParticipantIdentity(local_participant.0.identity().0)
291 }
292 Participant::Remote(remote_participant) => {
293 ParticipantIdentity(remote_participant.0.identity().0)
294 }
295 }
296 }
297}
298
299fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
300 match participant {
301 livekit::participant::Participant::Local(local) => {
302 Participant::Local(LocalParticipant(local))
303 }
304 livekit::participant::Participant::Remote(remote) => {
305 Participant::Remote(RemoteParticipant(remote))
306 }
307 }
308}
309
310fn publication_from_livekit(
311 publication: livekit::publication::TrackPublication,
312) -> TrackPublication {
313 match publication {
314 livekit::publication::TrackPublication::Local(local) => {
315 TrackPublication::Local(LocalTrackPublication(local))
316 }
317 livekit::publication::TrackPublication::Remote(remote) => {
318 TrackPublication::Remote(RemoteTrackPublication(remote))
319 }
320 }
321}
322
323fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
324 match track {
325 livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
326 livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
327 }
328}
329
330fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
331 match track {
332 livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
333 livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
334 }
335}
336fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
337 let event = match event {
338 livekit::RoomEvent::ParticipantConnected(remote_participant) => {
339 RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
340 }
341 livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
342 RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
343 }
344 livekit::RoomEvent::LocalTrackPublished {
345 publication,
346 track,
347 participant,
348 } => RoomEvent::LocalTrackPublished {
349 publication: LocalTrackPublication(publication),
350 track: local_track_from_livekit(track),
351 participant: LocalParticipant(participant),
352 },
353 livekit::RoomEvent::LocalTrackUnpublished {
354 publication,
355 participant,
356 } => RoomEvent::LocalTrackUnpublished {
357 publication: LocalTrackPublication(publication),
358 participant: LocalParticipant(participant),
359 },
360 livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
361 track: local_track_from_livekit(track),
362 },
363 livekit::RoomEvent::TrackSubscribed {
364 track,
365 publication,
366 participant,
367 } => RoomEvent::TrackSubscribed {
368 track: remote_track_from_livekit(track),
369 publication: RemoteTrackPublication(publication),
370 participant: RemoteParticipant(participant),
371 },
372 livekit::RoomEvent::TrackUnsubscribed {
373 track,
374 publication,
375 participant,
376 } => RoomEvent::TrackUnsubscribed {
377 track: remote_track_from_livekit(track),
378 publication: RemoteTrackPublication(publication),
379 participant: RemoteParticipant(participant),
380 },
381 livekit::RoomEvent::TrackSubscriptionFailed {
382 participant,
383 error: _,
384 track_sid,
385 } => RoomEvent::TrackSubscriptionFailed {
386 participant: RemoteParticipant(participant),
387 track_sid,
388 },
389 livekit::RoomEvent::TrackPublished {
390 publication,
391 participant,
392 } => RoomEvent::TrackPublished {
393 publication: RemoteTrackPublication(publication),
394 participant: RemoteParticipant(participant),
395 },
396 livekit::RoomEvent::TrackUnpublished {
397 publication,
398 participant,
399 } => RoomEvent::TrackUnpublished {
400 publication: RemoteTrackPublication(publication),
401 participant: RemoteParticipant(participant),
402 },
403 livekit::RoomEvent::TrackMuted {
404 participant,
405 publication,
406 } => RoomEvent::TrackMuted {
407 publication: publication_from_livekit(publication),
408 participant: participant_from_livekit(participant),
409 },
410 livekit::RoomEvent::TrackUnmuted {
411 participant,
412 publication,
413 } => RoomEvent::TrackUnmuted {
414 publication: publication_from_livekit(publication),
415 participant: participant_from_livekit(participant),
416 },
417 livekit::RoomEvent::RoomMetadataChanged {
418 old_metadata,
419 metadata,
420 } => RoomEvent::RoomMetadataChanged {
421 old_metadata,
422 metadata,
423 },
424 livekit::RoomEvent::ParticipantMetadataChanged {
425 participant,
426 old_metadata,
427 metadata,
428 } => RoomEvent::ParticipantMetadataChanged {
429 participant: participant_from_livekit(participant),
430 old_metadata,
431 metadata,
432 },
433 livekit::RoomEvent::ParticipantNameChanged {
434 participant,
435 old_name,
436 name,
437 } => RoomEvent::ParticipantNameChanged {
438 participant: participant_from_livekit(participant),
439 old_name,
440 name,
441 },
442 livekit::RoomEvent::ParticipantAttributesChanged {
443 participant,
444 changed_attributes,
445 } => RoomEvent::ParticipantAttributesChanged {
446 participant: participant_from_livekit(participant),
447 changed_attributes: changed_attributes.into_iter().collect(),
448 },
449 livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
450 RoomEvent::ActiveSpeakersChanged {
451 speakers: speakers.into_iter().map(participant_from_livekit).collect(),
452 }
453 }
454 livekit::RoomEvent::Connected {
455 participants_with_tracks,
456 } => RoomEvent::Connected {
457 participants_with_tracks: participants_with_tracks
458 .into_iter()
459 .map({
460 |(p, t)| {
461 (
462 RemoteParticipant(p),
463 t.into_iter().map(RemoteTrackPublication).collect(),
464 )
465 }
466 })
467 .collect(),
468 },
469 livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
470 reason: reason.as_str_name(),
471 },
472 livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
473 livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
474 _ => {
475 log::trace!("dropping livekit event: {:?}", event);
476 return None;
477 }
478 };
479
480 Some(event)
481}