1use std::sync::Arc;
2
3use anyhow::{Context as _, Result};
4use audio::AudioSettings;
5use collections::HashMap;
6use futures::{SinkExt, channel::mpsc};
7use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
8use gpui_tokio::Tokio;
9use log::info;
10use playback::capture_local_video_track;
11use settings::Settings;
12
13mod playback;
14
15use crate::{LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication};
16pub use playback::AudioStream;
17pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
18
19#[derive(Clone, Debug)]
20pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
21#[derive(Clone, Debug)]
22pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
23#[derive(Clone, Debug)]
24pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
25#[derive(Clone, Debug)]
26pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
27
28#[derive(Clone, Debug)]
29pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
30#[derive(Clone, Debug)]
31pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
32#[derive(Clone, Debug)]
33pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
34#[derive(Clone, Debug)]
35pub struct LocalParticipant(livekit::participant::LocalParticipant);
36
37pub struct Room {
38 room: livekit::Room,
39 _task: Task<()>,
40 playback: playback::AudioStack,
41}
42
43pub type TrackSid = livekit::id::TrackSid;
44pub type ConnectionState = livekit::ConnectionState;
45#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
46pub struct ParticipantIdentity(pub String);
47
48impl Room {
49 pub async fn connect(
50 url: String,
51 token: String,
52 cx: &mut AsyncApp,
53 ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
54 let connector =
55 tokio_tungstenite::Connector::Rustls(Arc::new(http_client_tls::tls_config()));
56 let mut config = livekit::RoomOptions::default();
57 config.connector = Some(connector);
58 let (room, mut events) = Tokio::spawn(cx, async move {
59 livekit::Room::connect(&url, &token, config).await
60 })?
61 .await??;
62
63 let (mut tx, rx) = mpsc::unbounded();
64 let task = cx.background_executor().spawn(async move {
65 while let Some(event) = events.recv().await {
66 if let Some(event) = room_event_from_livekit(event) {
67 tx.send(event).await.ok();
68 }
69 }
70 });
71
72 Ok((
73 Self {
74 room,
75 _task: task,
76 playback: playback::AudioStack::new(cx.background_executor().clone()),
77 },
78 rx,
79 ))
80 }
81
82 pub fn local_participant(&self) -> LocalParticipant {
83 LocalParticipant(self.room.local_participant())
84 }
85
86 pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
87 self.room
88 .remote_participants()
89 .into_iter()
90 .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
91 .collect()
92 }
93
94 pub fn connection_state(&self) -> ConnectionState {
95 self.room.connection_state()
96 }
97
98 pub async fn publish_local_microphone_track(
99 &self,
100 cx: &mut AsyncApp,
101 ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
102 let (track, stream) = self.playback.capture_local_microphone_track()?;
103 let publication = self
104 .local_participant()
105 .publish_track(
106 livekit::track::LocalTrack::Audio(track.0),
107 livekit::options::TrackPublishOptions {
108 source: livekit::track::TrackSource::Microphone,
109 ..Default::default()
110 },
111 cx,
112 )
113 .await?;
114
115 Ok((publication, stream))
116 }
117
118 pub async fn unpublish_local_track(
119 &self,
120 sid: TrackSid,
121 cx: &mut AsyncApp,
122 ) -> Result<LocalTrackPublication> {
123 self.local_participant().unpublish_track(sid, cx).await
124 }
125
126 pub fn play_remote_audio_track(
127 &self,
128 track: &RemoteAudioTrack,
129 cx: &mut App,
130 ) -> Result<playback::AudioStream> {
131 if AudioSettings::get_global(cx).rodio_audio {
132 info!("Using experimental.rodio_audio audio pipeline");
133 playback::play_remote_audio_track(&track.0, cx)
134 } else {
135 Ok(self.playback.play_remote_audio_track(&track.0))
136 }
137 }
138}
139
140impl LocalParticipant {
141 pub async fn publish_screenshare_track(
142 &self,
143 source: &dyn ScreenCaptureSource,
144 cx: &mut AsyncApp,
145 ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
146 let (track, stream) = capture_local_video_track(source, cx).await?;
147 let options = livekit::options::TrackPublishOptions {
148 source: livekit::track::TrackSource::Screenshare,
149 video_codec: livekit::options::VideoCodec::VP8,
150 ..Default::default()
151 };
152 let publication = self
153 .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
154 .await?;
155
156 Ok((publication, stream))
157 }
158
159 async fn publish_track(
160 &self,
161 track: livekit::track::LocalTrack,
162 options: livekit::options::TrackPublishOptions,
163 cx: &mut AsyncApp,
164 ) -> Result<LocalTrackPublication> {
165 let participant = self.0.clone();
166 Tokio::spawn(cx, async move {
167 participant.publish_track(track, options).await
168 })?
169 .await?
170 .map(LocalTrackPublication)
171 .context("publishing a track")
172 }
173
174 pub async fn unpublish_track(
175 &self,
176 sid: TrackSid,
177 cx: &mut AsyncApp,
178 ) -> Result<LocalTrackPublication> {
179 let participant = self.0.clone();
180 Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })?
181 .await?
182 .map(LocalTrackPublication)
183 .context("unpublishing a track")
184 }
185}
186
187impl LocalTrackPublication {
188 pub fn mute(&self, cx: &App) {
189 let track = self.0.clone();
190 Tokio::spawn(cx, async move {
191 track.mute();
192 })
193 .detach();
194 }
195
196 pub fn unmute(&self, cx: &App) {
197 let track = self.0.clone();
198 Tokio::spawn(cx, async move {
199 track.unmute();
200 })
201 .detach();
202 }
203
204 pub fn sid(&self) -> TrackSid {
205 self.0.sid()
206 }
207
208 pub fn is_muted(&self) -> bool {
209 self.0.is_muted()
210 }
211}
212
213impl RemoteParticipant {
214 pub fn identity(&self) -> ParticipantIdentity {
215 ParticipantIdentity(self.0.identity().0)
216 }
217
218 pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
219 self.0
220 .track_publications()
221 .into_iter()
222 .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
223 .collect()
224 }
225}
226
227impl RemoteAudioTrack {
228 pub fn sid(&self) -> TrackSid {
229 self.0.sid()
230 }
231}
232
233impl RemoteVideoTrack {
234 pub fn sid(&self) -> TrackSid {
235 self.0.sid()
236 }
237}
238
239impl RemoteTrackPublication {
240 pub fn is_muted(&self) -> bool {
241 self.0.is_muted()
242 }
243
244 pub fn is_enabled(&self) -> bool {
245 self.0.is_enabled()
246 }
247
248 pub fn track(&self) -> Option<RemoteTrack> {
249 self.0.track().map(remote_track_from_livekit)
250 }
251
252 pub fn is_audio(&self) -> bool {
253 self.0.kind() == livekit::track::TrackKind::Audio
254 }
255
256 pub fn set_enabled(&self, enabled: bool, cx: &App) {
257 let track = self.0.clone();
258 Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
259 }
260
261 pub fn sid(&self) -> TrackSid {
262 self.0.sid()
263 }
264}
265
266impl Participant {
267 pub fn identity(&self) -> ParticipantIdentity {
268 match self {
269 Participant::Local(local_participant) => {
270 ParticipantIdentity(local_participant.0.identity().0)
271 }
272 Participant::Remote(remote_participant) => {
273 ParticipantIdentity(remote_participant.0.identity().0)
274 }
275 }
276 }
277}
278
279fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
280 match participant {
281 livekit::participant::Participant::Local(local) => {
282 Participant::Local(LocalParticipant(local))
283 }
284 livekit::participant::Participant::Remote(remote) => {
285 Participant::Remote(RemoteParticipant(remote))
286 }
287 }
288}
289
290fn publication_from_livekit(
291 publication: livekit::publication::TrackPublication,
292) -> TrackPublication {
293 match publication {
294 livekit::publication::TrackPublication::Local(local) => {
295 TrackPublication::Local(LocalTrackPublication(local))
296 }
297 livekit::publication::TrackPublication::Remote(remote) => {
298 TrackPublication::Remote(RemoteTrackPublication(remote))
299 }
300 }
301}
302
303fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
304 match track {
305 livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
306 livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
307 }
308}
309
310fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
311 match track {
312 livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
313 livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
314 }
315}
316fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
317 let event = match event {
318 livekit::RoomEvent::ParticipantConnected(remote_participant) => {
319 RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
320 }
321 livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
322 RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
323 }
324 livekit::RoomEvent::LocalTrackPublished {
325 publication,
326 track,
327 participant,
328 } => RoomEvent::LocalTrackPublished {
329 publication: LocalTrackPublication(publication),
330 track: local_track_from_livekit(track),
331 participant: LocalParticipant(participant),
332 },
333 livekit::RoomEvent::LocalTrackUnpublished {
334 publication,
335 participant,
336 } => RoomEvent::LocalTrackUnpublished {
337 publication: LocalTrackPublication(publication),
338 participant: LocalParticipant(participant),
339 },
340 livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
341 track: local_track_from_livekit(track),
342 },
343 livekit::RoomEvent::TrackSubscribed {
344 track,
345 publication,
346 participant,
347 } => RoomEvent::TrackSubscribed {
348 track: remote_track_from_livekit(track),
349 publication: RemoteTrackPublication(publication),
350 participant: RemoteParticipant(participant),
351 },
352 livekit::RoomEvent::TrackUnsubscribed {
353 track,
354 publication,
355 participant,
356 } => RoomEvent::TrackUnsubscribed {
357 track: remote_track_from_livekit(track),
358 publication: RemoteTrackPublication(publication),
359 participant: RemoteParticipant(participant),
360 },
361 livekit::RoomEvent::TrackSubscriptionFailed {
362 participant,
363 error: _,
364 track_sid,
365 } => RoomEvent::TrackSubscriptionFailed {
366 participant: RemoteParticipant(participant),
367 track_sid,
368 },
369 livekit::RoomEvent::TrackPublished {
370 publication,
371 participant,
372 } => RoomEvent::TrackPublished {
373 publication: RemoteTrackPublication(publication),
374 participant: RemoteParticipant(participant),
375 },
376 livekit::RoomEvent::TrackUnpublished {
377 publication,
378 participant,
379 } => RoomEvent::TrackUnpublished {
380 publication: RemoteTrackPublication(publication),
381 participant: RemoteParticipant(participant),
382 },
383 livekit::RoomEvent::TrackMuted {
384 participant,
385 publication,
386 } => RoomEvent::TrackMuted {
387 publication: publication_from_livekit(publication),
388 participant: participant_from_livekit(participant),
389 },
390 livekit::RoomEvent::TrackUnmuted {
391 participant,
392 publication,
393 } => RoomEvent::TrackUnmuted {
394 publication: publication_from_livekit(publication),
395 participant: participant_from_livekit(participant),
396 },
397 livekit::RoomEvent::RoomMetadataChanged {
398 old_metadata,
399 metadata,
400 } => RoomEvent::RoomMetadataChanged {
401 old_metadata,
402 metadata,
403 },
404 livekit::RoomEvent::ParticipantMetadataChanged {
405 participant,
406 old_metadata,
407 metadata,
408 } => RoomEvent::ParticipantMetadataChanged {
409 participant: participant_from_livekit(participant),
410 old_metadata,
411 metadata,
412 },
413 livekit::RoomEvent::ParticipantNameChanged {
414 participant,
415 old_name,
416 name,
417 } => RoomEvent::ParticipantNameChanged {
418 participant: participant_from_livekit(participant),
419 old_name,
420 name,
421 },
422 livekit::RoomEvent::ParticipantAttributesChanged {
423 participant,
424 changed_attributes,
425 } => RoomEvent::ParticipantAttributesChanged {
426 participant: participant_from_livekit(participant),
427 changed_attributes: changed_attributes.into_iter().collect(),
428 },
429 livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
430 RoomEvent::ActiveSpeakersChanged {
431 speakers: speakers.into_iter().map(participant_from_livekit).collect(),
432 }
433 }
434 livekit::RoomEvent::Connected {
435 participants_with_tracks,
436 } => RoomEvent::Connected {
437 participants_with_tracks: participants_with_tracks
438 .into_iter()
439 .map({
440 |(p, t)| {
441 (
442 RemoteParticipant(p),
443 t.into_iter().map(RemoteTrackPublication).collect(),
444 )
445 }
446 })
447 .collect(),
448 },
449 livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
450 reason: reason.as_str_name(),
451 },
452 livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
453 livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
454 _ => {
455 log::trace!("dropping livekit event: {:?}", event);
456 return None;
457 }
458 };
459
460 Some(event)
461}