1use std::sync::Arc;
2
3use anyhow::{Context as _, Result};
4use audio::AudioSettings;
5use collections::HashMap;
6use futures::{SinkExt, channel::mpsc};
7use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
8use gpui_tokio::Tokio;
9use log::info;
10use playback::capture_local_video_track;
11use settings::Settings;
12
13mod playback;
14
15use crate::{LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication};
16pub use playback::AudioStream;
17pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
18
19#[derive(Clone, Debug)]
20pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
21#[derive(Clone, Debug)]
22pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
23#[derive(Clone, Debug)]
24pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
25#[derive(Clone, Debug)]
26pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
27
28#[derive(Clone, Debug)]
29pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
30#[derive(Clone, Debug)]
31pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
32#[derive(Clone, Debug)]
33pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
34#[derive(Clone, Debug)]
35pub struct LocalParticipant(livekit::participant::LocalParticipant);
36
37pub struct Room {
38 room: livekit::Room,
39 _task: Task<()>,
40 playback: playback::AudioStack,
41}
42
43pub type TrackSid = livekit::id::TrackSid;
44pub type ConnectionState = livekit::ConnectionState;
45#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
46pub struct ParticipantIdentity(pub String);
47
48impl Room {
49 pub async fn connect(
50 url: String,
51 token: String,
52 cx: &mut AsyncApp,
53 ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
54 let connector =
55 tokio_tungstenite::Connector::Rustls(Arc::new(http_client_tls::tls_config()));
56 let mut config = livekit::RoomOptions::default();
57 config.connector = Some(connector);
58 let (room, mut events) = Tokio::spawn(cx, async move {
59 livekit::Room::connect(&url, &token, config).await
60 })?
61 .await??;
62
63 let (mut tx, rx) = mpsc::unbounded();
64 let task = cx.background_executor().spawn(async move {
65 while let Some(event) = events.recv().await {
66 if let Some(event) = room_event_from_livekit(event) {
67 tx.send(event).await.ok();
68 }
69 }
70 });
71
72 Ok((
73 Self {
74 room,
75 _task: task,
76 playback: playback::AudioStack::new(cx.background_executor().clone()),
77 },
78 rx,
79 ))
80 }
81
82 pub fn local_participant(&self) -> LocalParticipant {
83 LocalParticipant(self.room.local_participant())
84 }
85
86 pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
87 self.room
88 .remote_participants()
89 .into_iter()
90 .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
91 .collect()
92 }
93
94 pub fn connection_state(&self) -> ConnectionState {
95 self.room.connection_state()
96 }
97
98 pub async fn publish_local_microphone_track(
99 &self,
100 user_name: String,
101 is_staff: bool,
102 cx: &mut AsyncApp,
103 ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
104 let (track, stream) = self
105 .playback
106 .capture_local_microphone_track(user_name, is_staff, &cx)?;
107 let publication = self
108 .local_participant()
109 .publish_track(
110 livekit::track::LocalTrack::Audio(track.0),
111 livekit::options::TrackPublishOptions {
112 source: livekit::track::TrackSource::Microphone,
113 ..Default::default()
114 },
115 cx,
116 )
117 .await?;
118
119 Ok((publication, stream))
120 }
121
122 pub async fn unpublish_local_track(
123 &self,
124 sid: TrackSid,
125 cx: &mut AsyncApp,
126 ) -> Result<LocalTrackPublication> {
127 self.local_participant().unpublish_track(sid, cx).await
128 }
129
130 pub fn play_remote_audio_track(
131 &self,
132 track: &RemoteAudioTrack,
133 cx: &mut App,
134 ) -> Result<playback::AudioStream> {
135 if AudioSettings::get_global(cx).rodio_audio {
136 info!("Using experimental.rodio_audio audio pipeline for output");
137 playback::play_remote_audio_track(&track.0, cx)
138 } else {
139 Ok(self.playback.play_remote_audio_track(&track.0))
140 }
141 }
142}
143
144impl LocalParticipant {
145 pub async fn publish_screenshare_track(
146 &self,
147 source: &dyn ScreenCaptureSource,
148 cx: &mut AsyncApp,
149 ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
150 let (track, stream) = capture_local_video_track(source, cx).await?;
151 let options = livekit::options::TrackPublishOptions {
152 source: livekit::track::TrackSource::Screenshare,
153 video_codec: livekit::options::VideoCodec::VP8,
154 ..Default::default()
155 };
156 let publication = self
157 .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
158 .await?;
159
160 Ok((publication, stream))
161 }
162
163 async fn publish_track(
164 &self,
165 track: livekit::track::LocalTrack,
166 options: livekit::options::TrackPublishOptions,
167 cx: &mut AsyncApp,
168 ) -> Result<LocalTrackPublication> {
169 let participant = self.0.clone();
170 Tokio::spawn(cx, async move {
171 participant.publish_track(track, options).await
172 })?
173 .await?
174 .map(LocalTrackPublication)
175 .context("publishing a track")
176 }
177
178 pub async fn unpublish_track(
179 &self,
180 sid: TrackSid,
181 cx: &mut AsyncApp,
182 ) -> Result<LocalTrackPublication> {
183 let participant = self.0.clone();
184 Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })?
185 .await?
186 .map(LocalTrackPublication)
187 .context("unpublishing a track")
188 }
189}
190
191impl LocalTrackPublication {
192 pub fn mute(&self, cx: &App) {
193 let track = self.0.clone();
194 Tokio::spawn(cx, async move {
195 track.mute();
196 })
197 .detach();
198 }
199
200 pub fn unmute(&self, cx: &App) {
201 let track = self.0.clone();
202 Tokio::spawn(cx, async move {
203 track.unmute();
204 })
205 .detach();
206 }
207
208 pub fn sid(&self) -> TrackSid {
209 self.0.sid()
210 }
211
212 pub fn is_muted(&self) -> bool {
213 self.0.is_muted()
214 }
215}
216
217impl RemoteParticipant {
218 pub fn identity(&self) -> ParticipantIdentity {
219 ParticipantIdentity(self.0.identity().0)
220 }
221
222 pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
223 self.0
224 .track_publications()
225 .into_iter()
226 .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
227 .collect()
228 }
229}
230
231impl RemoteAudioTrack {
232 pub fn sid(&self) -> TrackSid {
233 self.0.sid()
234 }
235}
236
237impl RemoteVideoTrack {
238 pub fn sid(&self) -> TrackSid {
239 self.0.sid()
240 }
241}
242
243impl RemoteTrackPublication {
244 pub fn is_muted(&self) -> bool {
245 self.0.is_muted()
246 }
247
248 pub fn is_enabled(&self) -> bool {
249 self.0.is_enabled()
250 }
251
252 pub fn track(&self) -> Option<RemoteTrack> {
253 self.0.track().map(remote_track_from_livekit)
254 }
255
256 pub fn is_audio(&self) -> bool {
257 self.0.kind() == livekit::track::TrackKind::Audio
258 }
259
260 pub fn set_enabled(&self, enabled: bool, cx: &App) {
261 let track = self.0.clone();
262 Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
263 }
264
265 pub fn sid(&self) -> TrackSid {
266 self.0.sid()
267 }
268}
269
270impl Participant {
271 pub fn identity(&self) -> ParticipantIdentity {
272 match self {
273 Participant::Local(local_participant) => {
274 ParticipantIdentity(local_participant.0.identity().0)
275 }
276 Participant::Remote(remote_participant) => {
277 ParticipantIdentity(remote_participant.0.identity().0)
278 }
279 }
280 }
281}
282
283fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
284 match participant {
285 livekit::participant::Participant::Local(local) => {
286 Participant::Local(LocalParticipant(local))
287 }
288 livekit::participant::Participant::Remote(remote) => {
289 Participant::Remote(RemoteParticipant(remote))
290 }
291 }
292}
293
294fn publication_from_livekit(
295 publication: livekit::publication::TrackPublication,
296) -> TrackPublication {
297 match publication {
298 livekit::publication::TrackPublication::Local(local) => {
299 TrackPublication::Local(LocalTrackPublication(local))
300 }
301 livekit::publication::TrackPublication::Remote(remote) => {
302 TrackPublication::Remote(RemoteTrackPublication(remote))
303 }
304 }
305}
306
307fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
308 match track {
309 livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
310 livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
311 }
312}
313
314fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
315 match track {
316 livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
317 livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
318 }
319}
320fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
321 let event = match event {
322 livekit::RoomEvent::ParticipantConnected(remote_participant) => {
323 RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
324 }
325 livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
326 RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
327 }
328 livekit::RoomEvent::LocalTrackPublished {
329 publication,
330 track,
331 participant,
332 } => RoomEvent::LocalTrackPublished {
333 publication: LocalTrackPublication(publication),
334 track: local_track_from_livekit(track),
335 participant: LocalParticipant(participant),
336 },
337 livekit::RoomEvent::LocalTrackUnpublished {
338 publication,
339 participant,
340 } => RoomEvent::LocalTrackUnpublished {
341 publication: LocalTrackPublication(publication),
342 participant: LocalParticipant(participant),
343 },
344 livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
345 track: local_track_from_livekit(track),
346 },
347 livekit::RoomEvent::TrackSubscribed {
348 track,
349 publication,
350 participant,
351 } => RoomEvent::TrackSubscribed {
352 track: remote_track_from_livekit(track),
353 publication: RemoteTrackPublication(publication),
354 participant: RemoteParticipant(participant),
355 },
356 livekit::RoomEvent::TrackUnsubscribed {
357 track,
358 publication,
359 participant,
360 } => RoomEvent::TrackUnsubscribed {
361 track: remote_track_from_livekit(track),
362 publication: RemoteTrackPublication(publication),
363 participant: RemoteParticipant(participant),
364 },
365 livekit::RoomEvent::TrackSubscriptionFailed {
366 participant,
367 error: _,
368 track_sid,
369 } => RoomEvent::TrackSubscriptionFailed {
370 participant: RemoteParticipant(participant),
371 track_sid,
372 },
373 livekit::RoomEvent::TrackPublished {
374 publication,
375 participant,
376 } => RoomEvent::TrackPublished {
377 publication: RemoteTrackPublication(publication),
378 participant: RemoteParticipant(participant),
379 },
380 livekit::RoomEvent::TrackUnpublished {
381 publication,
382 participant,
383 } => RoomEvent::TrackUnpublished {
384 publication: RemoteTrackPublication(publication),
385 participant: RemoteParticipant(participant),
386 },
387 livekit::RoomEvent::TrackMuted {
388 participant,
389 publication,
390 } => RoomEvent::TrackMuted {
391 publication: publication_from_livekit(publication),
392 participant: participant_from_livekit(participant),
393 },
394 livekit::RoomEvent::TrackUnmuted {
395 participant,
396 publication,
397 } => RoomEvent::TrackUnmuted {
398 publication: publication_from_livekit(publication),
399 participant: participant_from_livekit(participant),
400 },
401 livekit::RoomEvent::RoomMetadataChanged {
402 old_metadata,
403 metadata,
404 } => RoomEvent::RoomMetadataChanged {
405 old_metadata,
406 metadata,
407 },
408 livekit::RoomEvent::ParticipantMetadataChanged {
409 participant,
410 old_metadata,
411 metadata,
412 } => RoomEvent::ParticipantMetadataChanged {
413 participant: participant_from_livekit(participant),
414 old_metadata,
415 metadata,
416 },
417 livekit::RoomEvent::ParticipantNameChanged {
418 participant,
419 old_name,
420 name,
421 } => RoomEvent::ParticipantNameChanged {
422 participant: participant_from_livekit(participant),
423 old_name,
424 name,
425 },
426 livekit::RoomEvent::ParticipantAttributesChanged {
427 participant,
428 changed_attributes,
429 } => RoomEvent::ParticipantAttributesChanged {
430 participant: participant_from_livekit(participant),
431 changed_attributes: changed_attributes.into_iter().collect(),
432 },
433 livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
434 RoomEvent::ActiveSpeakersChanged {
435 speakers: speakers.into_iter().map(participant_from_livekit).collect(),
436 }
437 }
438 livekit::RoomEvent::Connected {
439 participants_with_tracks,
440 } => RoomEvent::Connected {
441 participants_with_tracks: participants_with_tracks
442 .into_iter()
443 .map({
444 |(p, t)| {
445 (
446 RemoteParticipant(p),
447 t.into_iter().map(RemoteTrackPublication).collect(),
448 )
449 }
450 })
451 .collect(),
452 },
453 livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
454 reason: reason.as_str_name(),
455 },
456 livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
457 livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
458 _ => {
459 log::trace!("dropping livekit event: {:?}", event);
460 return None;
461 }
462 };
463
464 Some(event)
465}