1use std::sync::Arc;
2
3use anyhow::{Context as _, Result};
4use collections::HashMap;
5use futures::{SinkExt, channel::mpsc};
6use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
7use gpui_tokio::Tokio;
8use playback::capture_local_video_track;
9
10mod playback;
11#[cfg(feature = "record-microphone")]
12mod record;
13
14use crate::{LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication};
15pub use playback::AudioStream;
16pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
17
18#[derive(Clone, Debug)]
19pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
20#[derive(Clone, Debug)]
21pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
22#[derive(Clone, Debug)]
23pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
24#[derive(Clone, Debug)]
25pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
26
27#[derive(Clone, Debug)]
28pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
29#[derive(Clone, Debug)]
30pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
31#[derive(Clone, Debug)]
32pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
33#[derive(Clone, Debug)]
34pub struct LocalParticipant(livekit::participant::LocalParticipant);
35
36pub struct Room {
37 room: livekit::Room,
38 _task: Task<()>,
39 playback: playback::AudioStack,
40}
41
42pub type TrackSid = livekit::id::TrackSid;
43pub type ConnectionState = livekit::ConnectionState;
44#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
45pub struct ParticipantIdentity(pub String);
46
47impl Room {
48 pub async fn connect(
49 url: String,
50 token: String,
51 cx: &mut AsyncApp,
52 ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
53 let connector =
54 tokio_tungstenite::Connector::Rustls(Arc::new(http_client_tls::tls_config()));
55 let mut config = livekit::RoomOptions::default();
56 config.connector = Some(connector);
57 let (room, mut events) = Tokio::spawn(cx, async move {
58 livekit::Room::connect(&url, &token, config).await
59 })?
60 .await??;
61
62 let (mut tx, rx) = mpsc::unbounded();
63 let task = cx.background_executor().spawn(async move {
64 while let Some(event) = events.recv().await {
65 if let Some(event) = room_event_from_livekit(event) {
66 tx.send(event).await.ok();
67 }
68 }
69 });
70
71 Ok((
72 Self {
73 room,
74 _task: task,
75 playback: playback::AudioStack::new(cx.background_executor().clone()),
76 },
77 rx,
78 ))
79 }
80
81 pub fn local_participant(&self) -> LocalParticipant {
82 LocalParticipant(self.room.local_participant())
83 }
84
85 pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
86 self.room
87 .remote_participants()
88 .into_iter()
89 .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
90 .collect()
91 }
92
93 pub fn connection_state(&self) -> ConnectionState {
94 self.room.connection_state()
95 }
96
97 pub async fn publish_local_microphone_track(
98 &self,
99 cx: &mut AsyncApp,
100 ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
101 let (track, stream) = self.playback.capture_local_microphone_track()?;
102 let publication = self
103 .local_participant()
104 .publish_track(
105 livekit::track::LocalTrack::Audio(track.0),
106 livekit::options::TrackPublishOptions {
107 source: livekit::track::TrackSource::Microphone,
108 ..Default::default()
109 },
110 cx,
111 )
112 .await?;
113
114 Ok((publication, stream))
115 }
116
117 pub async fn unpublish_local_track(
118 &self,
119 sid: TrackSid,
120 cx: &mut AsyncApp,
121 ) -> Result<LocalTrackPublication> {
122 self.local_participant().unpublish_track(sid, cx).await
123 }
124
125 pub fn play_remote_audio_track(
126 &self,
127 track: &RemoteAudioTrack,
128 _cx: &App,
129 ) -> Result<playback::AudioStream> {
130 Ok(self.playback.play_remote_audio_track(&track.0))
131 }
132}
133
134impl LocalParticipant {
135 pub async fn publish_screenshare_track(
136 &self,
137 source: &dyn ScreenCaptureSource,
138 cx: &mut AsyncApp,
139 ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
140 let (track, stream) = capture_local_video_track(source, cx).await?;
141 let options = livekit::options::TrackPublishOptions {
142 source: livekit::track::TrackSource::Screenshare,
143 video_codec: livekit::options::VideoCodec::VP8,
144 ..Default::default()
145 };
146 let publication = self
147 .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
148 .await?;
149
150 Ok((publication, stream))
151 }
152
153 async fn publish_track(
154 &self,
155 track: livekit::track::LocalTrack,
156 options: livekit::options::TrackPublishOptions,
157 cx: &mut AsyncApp,
158 ) -> Result<LocalTrackPublication> {
159 let participant = self.0.clone();
160 Tokio::spawn(cx, async move {
161 participant.publish_track(track, options).await
162 })?
163 .await?
164 .map(LocalTrackPublication)
165 .context("publishing a track")
166 }
167
168 pub async fn unpublish_track(
169 &self,
170 sid: TrackSid,
171 cx: &mut AsyncApp,
172 ) -> Result<LocalTrackPublication> {
173 let participant = self.0.clone();
174 Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })?
175 .await?
176 .map(LocalTrackPublication)
177 .context("unpublishing a track")
178 }
179}
180
181impl LocalTrackPublication {
182 pub fn mute(&self, cx: &App) {
183 let track = self.0.clone();
184 Tokio::spawn(cx, async move {
185 track.mute();
186 })
187 .detach();
188 }
189
190 pub fn unmute(&self, cx: &App) {
191 let track = self.0.clone();
192 Tokio::spawn(cx, async move {
193 track.unmute();
194 })
195 .detach();
196 }
197
198 pub fn sid(&self) -> TrackSid {
199 self.0.sid()
200 }
201
202 pub fn is_muted(&self) -> bool {
203 self.0.is_muted()
204 }
205}
206
207impl RemoteParticipant {
208 pub fn identity(&self) -> ParticipantIdentity {
209 ParticipantIdentity(self.0.identity().0)
210 }
211
212 pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
213 self.0
214 .track_publications()
215 .into_iter()
216 .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
217 .collect()
218 }
219}
220
221impl RemoteAudioTrack {
222 pub fn sid(&self) -> TrackSid {
223 self.0.sid()
224 }
225}
226
227impl RemoteVideoTrack {
228 pub fn sid(&self) -> TrackSid {
229 self.0.sid()
230 }
231}
232
233impl RemoteTrackPublication {
234 pub fn is_muted(&self) -> bool {
235 self.0.is_muted()
236 }
237
238 pub fn is_enabled(&self) -> bool {
239 self.0.is_enabled()
240 }
241
242 pub fn track(&self) -> Option<RemoteTrack> {
243 self.0.track().map(remote_track_from_livekit)
244 }
245
246 pub fn is_audio(&self) -> bool {
247 self.0.kind() == livekit::track::TrackKind::Audio
248 }
249
250 pub fn set_enabled(&self, enabled: bool, cx: &App) {
251 let track = self.0.clone();
252 Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
253 }
254
255 pub fn sid(&self) -> TrackSid {
256 self.0.sid()
257 }
258}
259
260impl Participant {
261 pub fn identity(&self) -> ParticipantIdentity {
262 match self {
263 Participant::Local(local_participant) => {
264 ParticipantIdentity(local_participant.0.identity().0)
265 }
266 Participant::Remote(remote_participant) => {
267 ParticipantIdentity(remote_participant.0.identity().0)
268 }
269 }
270 }
271}
272
273fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
274 match participant {
275 livekit::participant::Participant::Local(local) => {
276 Participant::Local(LocalParticipant(local))
277 }
278 livekit::participant::Participant::Remote(remote) => {
279 Participant::Remote(RemoteParticipant(remote))
280 }
281 }
282}
283
284fn publication_from_livekit(
285 publication: livekit::publication::TrackPublication,
286) -> TrackPublication {
287 match publication {
288 livekit::publication::TrackPublication::Local(local) => {
289 TrackPublication::Local(LocalTrackPublication(local))
290 }
291 livekit::publication::TrackPublication::Remote(remote) => {
292 TrackPublication::Remote(RemoteTrackPublication(remote))
293 }
294 }
295}
296
297fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
298 match track {
299 livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
300 livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
301 }
302}
303
304fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
305 match track {
306 livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
307 livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
308 }
309}
310fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
311 let event = match event {
312 livekit::RoomEvent::ParticipantConnected(remote_participant) => {
313 RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
314 }
315 livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
316 RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
317 }
318 livekit::RoomEvent::LocalTrackPublished {
319 publication,
320 track,
321 participant,
322 } => RoomEvent::LocalTrackPublished {
323 publication: LocalTrackPublication(publication),
324 track: local_track_from_livekit(track),
325 participant: LocalParticipant(participant),
326 },
327 livekit::RoomEvent::LocalTrackUnpublished {
328 publication,
329 participant,
330 } => RoomEvent::LocalTrackUnpublished {
331 publication: LocalTrackPublication(publication),
332 participant: LocalParticipant(participant),
333 },
334 livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
335 track: local_track_from_livekit(track),
336 },
337 livekit::RoomEvent::TrackSubscribed {
338 track,
339 publication,
340 participant,
341 } => RoomEvent::TrackSubscribed {
342 track: remote_track_from_livekit(track),
343 publication: RemoteTrackPublication(publication),
344 participant: RemoteParticipant(participant),
345 },
346 livekit::RoomEvent::TrackUnsubscribed {
347 track,
348 publication,
349 participant,
350 } => RoomEvent::TrackUnsubscribed {
351 track: remote_track_from_livekit(track),
352 publication: RemoteTrackPublication(publication),
353 participant: RemoteParticipant(participant),
354 },
355 livekit::RoomEvent::TrackSubscriptionFailed {
356 participant,
357 error: _,
358 track_sid,
359 } => RoomEvent::TrackSubscriptionFailed {
360 participant: RemoteParticipant(participant),
361 track_sid,
362 },
363 livekit::RoomEvent::TrackPublished {
364 publication,
365 participant,
366 } => RoomEvent::TrackPublished {
367 publication: RemoteTrackPublication(publication),
368 participant: RemoteParticipant(participant),
369 },
370 livekit::RoomEvent::TrackUnpublished {
371 publication,
372 participant,
373 } => RoomEvent::TrackUnpublished {
374 publication: RemoteTrackPublication(publication),
375 participant: RemoteParticipant(participant),
376 },
377 livekit::RoomEvent::TrackMuted {
378 participant,
379 publication,
380 } => RoomEvent::TrackMuted {
381 publication: publication_from_livekit(publication),
382 participant: participant_from_livekit(participant),
383 },
384 livekit::RoomEvent::TrackUnmuted {
385 participant,
386 publication,
387 } => RoomEvent::TrackUnmuted {
388 publication: publication_from_livekit(publication),
389 participant: participant_from_livekit(participant),
390 },
391 livekit::RoomEvent::RoomMetadataChanged {
392 old_metadata,
393 metadata,
394 } => RoomEvent::RoomMetadataChanged {
395 old_metadata,
396 metadata,
397 },
398 livekit::RoomEvent::ParticipantMetadataChanged {
399 participant,
400 old_metadata,
401 metadata,
402 } => RoomEvent::ParticipantMetadataChanged {
403 participant: participant_from_livekit(participant),
404 old_metadata,
405 metadata,
406 },
407 livekit::RoomEvent::ParticipantNameChanged {
408 participant,
409 old_name,
410 name,
411 } => RoomEvent::ParticipantNameChanged {
412 participant: participant_from_livekit(participant),
413 old_name,
414 name,
415 },
416 livekit::RoomEvent::ParticipantAttributesChanged {
417 participant,
418 changed_attributes,
419 } => RoomEvent::ParticipantAttributesChanged {
420 participant: participant_from_livekit(participant),
421 changed_attributes: changed_attributes.into_iter().collect(),
422 },
423 livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
424 RoomEvent::ActiveSpeakersChanged {
425 speakers: speakers.into_iter().map(participant_from_livekit).collect(),
426 }
427 }
428 livekit::RoomEvent::Connected {
429 participants_with_tracks,
430 } => RoomEvent::Connected {
431 participants_with_tracks: participants_with_tracks
432 .into_iter()
433 .map({
434 |(p, t)| {
435 (
436 RemoteParticipant(p),
437 t.into_iter().map(RemoteTrackPublication).collect(),
438 )
439 }
440 })
441 .collect(),
442 },
443 livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
444 reason: reason.as_str_name(),
445 },
446 livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
447 livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
448 _ => {
449 log::trace!("dropping livekit event: {:?}", event);
450 return None;
451 }
452 };
453
454 Some(event)
455}