1use std::sync::Arc;
2
3use anyhow::{Context as _, Result};
4use collections::HashMap;
5use futures::{SinkExt, channel::mpsc};
6use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
7use gpui_tokio::Tokio;
8use playback::capture_local_video_track;
9
10mod playback;
11
12use crate::{LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication};
13pub use playback::AudioStream;
14pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
15
16#[derive(Clone, Debug)]
17pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
18#[derive(Clone, Debug)]
19pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
20#[derive(Clone, Debug)]
21pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
22#[derive(Clone, Debug)]
23pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
24
25#[derive(Clone, Debug)]
26pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
27#[derive(Clone, Debug)]
28pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
29#[derive(Clone, Debug)]
30pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
31#[derive(Clone, Debug)]
32pub struct LocalParticipant(livekit::participant::LocalParticipant);
33
34pub struct Room {
35 room: livekit::Room,
36 _task: Task<()>,
37 playback: playback::AudioStack,
38}
39
40pub type TrackSid = livekit::id::TrackSid;
41pub type ConnectionState = livekit::ConnectionState;
42#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
43pub struct ParticipantIdentity(pub String);
44
45impl Room {
46 pub async fn connect(
47 url: String,
48 token: String,
49 cx: &mut AsyncApp,
50 ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
51 let connector =
52 tokio_tungstenite::Connector::Rustls(Arc::new(http_client_tls::tls_config()));
53 let mut config = livekit::RoomOptions::default();
54 config.connector = Some(connector);
55 let (room, mut events) = Tokio::spawn(cx, async move {
56 livekit::Room::connect(&url, &token, config).await
57 })?
58 .await??;
59
60 let (mut tx, rx) = mpsc::unbounded();
61 let task = cx.background_executor().spawn(async move {
62 while let Some(event) = events.recv().await {
63 if let Some(event) = room_event_from_livekit(event) {
64 tx.send(event).await.ok();
65 }
66 }
67 });
68
69 Ok((
70 Self {
71 room,
72 _task: task,
73 playback: playback::AudioStack::new(cx.background_executor().clone()),
74 },
75 rx,
76 ))
77 }
78
79 pub fn local_participant(&self) -> LocalParticipant {
80 LocalParticipant(self.room.local_participant())
81 }
82
83 pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
84 self.room
85 .remote_participants()
86 .into_iter()
87 .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
88 .collect()
89 }
90
91 pub fn connection_state(&self) -> ConnectionState {
92 self.room.connection_state()
93 }
94
95 pub async fn publish_local_microphone_track(
96 &self,
97 cx: &mut AsyncApp,
98 ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
99 let (track, stream) = self.playback.capture_local_microphone_track()?;
100 let publication = self
101 .local_participant()
102 .publish_track(
103 livekit::track::LocalTrack::Audio(track.0),
104 livekit::options::TrackPublishOptions {
105 source: livekit::track::TrackSource::Microphone,
106 ..Default::default()
107 },
108 cx,
109 )
110 .await?;
111
112 Ok((publication, stream))
113 }
114
115 pub async fn unpublish_local_track(
116 &self,
117 sid: TrackSid,
118 cx: &mut AsyncApp,
119 ) -> Result<LocalTrackPublication> {
120 self.local_participant().unpublish_track(sid, cx).await
121 }
122
123 pub fn play_remote_audio_track(
124 &self,
125 track: &RemoteAudioTrack,
126 _cx: &App,
127 ) -> Result<playback::AudioStream> {
128 Ok(self.playback.play_remote_audio_track(&track.0))
129 }
130}
131
132impl LocalParticipant {
133 pub async fn publish_screenshare_track(
134 &self,
135 source: &dyn ScreenCaptureSource,
136 cx: &mut AsyncApp,
137 ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
138 let (track, stream) = capture_local_video_track(source, cx).await?;
139 let options = livekit::options::TrackPublishOptions {
140 source: livekit::track::TrackSource::Screenshare,
141 video_codec: livekit::options::VideoCodec::VP8,
142 ..Default::default()
143 };
144 let publication = self
145 .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
146 .await?;
147
148 Ok((publication, stream))
149 }
150
151 async fn publish_track(
152 &self,
153 track: livekit::track::LocalTrack,
154 options: livekit::options::TrackPublishOptions,
155 cx: &mut AsyncApp,
156 ) -> Result<LocalTrackPublication> {
157 let participant = self.0.clone();
158 Tokio::spawn(cx, async move {
159 participant.publish_track(track, options).await
160 })?
161 .await?
162 .map(LocalTrackPublication)
163 .context("publishing a track")
164 }
165
166 pub async fn unpublish_track(
167 &self,
168 sid: TrackSid,
169 cx: &mut AsyncApp,
170 ) -> Result<LocalTrackPublication> {
171 let participant = self.0.clone();
172 Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })?
173 .await?
174 .map(LocalTrackPublication)
175 .context("unpublishing a track")
176 }
177}
178
179impl LocalTrackPublication {
180 pub fn mute(&self, cx: &App) {
181 let track = self.0.clone();
182 Tokio::spawn(cx, async move {
183 track.mute();
184 })
185 .detach();
186 }
187
188 pub fn unmute(&self, cx: &App) {
189 let track = self.0.clone();
190 Tokio::spawn(cx, async move {
191 track.unmute();
192 })
193 .detach();
194 }
195
196 pub fn sid(&self) -> TrackSid {
197 self.0.sid()
198 }
199
200 pub fn is_muted(&self) -> bool {
201 self.0.is_muted()
202 }
203}
204
205impl RemoteParticipant {
206 pub fn identity(&self) -> ParticipantIdentity {
207 ParticipantIdentity(self.0.identity().0)
208 }
209
210 pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
211 self.0
212 .track_publications()
213 .into_iter()
214 .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
215 .collect()
216 }
217}
218
219impl RemoteAudioTrack {
220 pub fn sid(&self) -> TrackSid {
221 self.0.sid()
222 }
223}
224
225impl RemoteVideoTrack {
226 pub fn sid(&self) -> TrackSid {
227 self.0.sid()
228 }
229}
230
231impl RemoteTrackPublication {
232 pub fn is_muted(&self) -> bool {
233 self.0.is_muted()
234 }
235
236 pub fn is_enabled(&self) -> bool {
237 self.0.is_enabled()
238 }
239
240 pub fn track(&self) -> Option<RemoteTrack> {
241 self.0.track().map(remote_track_from_livekit)
242 }
243
244 pub fn is_audio(&self) -> bool {
245 self.0.kind() == livekit::track::TrackKind::Audio
246 }
247
248 pub fn set_enabled(&self, enabled: bool, cx: &App) {
249 let track = self.0.clone();
250 Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
251 }
252
253 pub fn sid(&self) -> TrackSid {
254 self.0.sid()
255 }
256}
257
258impl Participant {
259 pub fn identity(&self) -> ParticipantIdentity {
260 match self {
261 Participant::Local(local_participant) => {
262 ParticipantIdentity(local_participant.0.identity().0)
263 }
264 Participant::Remote(remote_participant) => {
265 ParticipantIdentity(remote_participant.0.identity().0)
266 }
267 }
268 }
269}
270
271fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
272 match participant {
273 livekit::participant::Participant::Local(local) => {
274 Participant::Local(LocalParticipant(local))
275 }
276 livekit::participant::Participant::Remote(remote) => {
277 Participant::Remote(RemoteParticipant(remote))
278 }
279 }
280}
281
282fn publication_from_livekit(
283 publication: livekit::publication::TrackPublication,
284) -> TrackPublication {
285 match publication {
286 livekit::publication::TrackPublication::Local(local) => {
287 TrackPublication::Local(LocalTrackPublication(local))
288 }
289 livekit::publication::TrackPublication::Remote(remote) => {
290 TrackPublication::Remote(RemoteTrackPublication(remote))
291 }
292 }
293}
294
295fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
296 match track {
297 livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
298 livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
299 }
300}
301
302fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
303 match track {
304 livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
305 livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
306 }
307}
308fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
309 let event = match event {
310 livekit::RoomEvent::ParticipantConnected(remote_participant) => {
311 RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
312 }
313 livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
314 RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
315 }
316 livekit::RoomEvent::LocalTrackPublished {
317 publication,
318 track,
319 participant,
320 } => RoomEvent::LocalTrackPublished {
321 publication: LocalTrackPublication(publication),
322 track: local_track_from_livekit(track),
323 participant: LocalParticipant(participant),
324 },
325 livekit::RoomEvent::LocalTrackUnpublished {
326 publication,
327 participant,
328 } => RoomEvent::LocalTrackUnpublished {
329 publication: LocalTrackPublication(publication),
330 participant: LocalParticipant(participant),
331 },
332 livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
333 track: local_track_from_livekit(track),
334 },
335 livekit::RoomEvent::TrackSubscribed {
336 track,
337 publication,
338 participant,
339 } => RoomEvent::TrackSubscribed {
340 track: remote_track_from_livekit(track),
341 publication: RemoteTrackPublication(publication),
342 participant: RemoteParticipant(participant),
343 },
344 livekit::RoomEvent::TrackUnsubscribed {
345 track,
346 publication,
347 participant,
348 } => RoomEvent::TrackUnsubscribed {
349 track: remote_track_from_livekit(track),
350 publication: RemoteTrackPublication(publication),
351 participant: RemoteParticipant(participant),
352 },
353 livekit::RoomEvent::TrackSubscriptionFailed {
354 participant,
355 error: _,
356 track_sid,
357 } => RoomEvent::TrackSubscriptionFailed {
358 participant: RemoteParticipant(participant),
359 track_sid,
360 },
361 livekit::RoomEvent::TrackPublished {
362 publication,
363 participant,
364 } => RoomEvent::TrackPublished {
365 publication: RemoteTrackPublication(publication),
366 participant: RemoteParticipant(participant),
367 },
368 livekit::RoomEvent::TrackUnpublished {
369 publication,
370 participant,
371 } => RoomEvent::TrackUnpublished {
372 publication: RemoteTrackPublication(publication),
373 participant: RemoteParticipant(participant),
374 },
375 livekit::RoomEvent::TrackMuted {
376 participant,
377 publication,
378 } => RoomEvent::TrackMuted {
379 publication: publication_from_livekit(publication),
380 participant: participant_from_livekit(participant),
381 },
382 livekit::RoomEvent::TrackUnmuted {
383 participant,
384 publication,
385 } => RoomEvent::TrackUnmuted {
386 publication: publication_from_livekit(publication),
387 participant: participant_from_livekit(participant),
388 },
389 livekit::RoomEvent::RoomMetadataChanged {
390 old_metadata,
391 metadata,
392 } => RoomEvent::RoomMetadataChanged {
393 old_metadata,
394 metadata,
395 },
396 livekit::RoomEvent::ParticipantMetadataChanged {
397 participant,
398 old_metadata,
399 metadata,
400 } => RoomEvent::ParticipantMetadataChanged {
401 participant: participant_from_livekit(participant),
402 old_metadata,
403 metadata,
404 },
405 livekit::RoomEvent::ParticipantNameChanged {
406 participant,
407 old_name,
408 name,
409 } => RoomEvent::ParticipantNameChanged {
410 participant: participant_from_livekit(participant),
411 old_name,
412 name,
413 },
414 livekit::RoomEvent::ParticipantAttributesChanged {
415 participant,
416 changed_attributes,
417 } => RoomEvent::ParticipantAttributesChanged {
418 participant: participant_from_livekit(participant),
419 changed_attributes: changed_attributes.into_iter().collect(),
420 },
421 livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
422 RoomEvent::ActiveSpeakersChanged {
423 speakers: speakers.into_iter().map(participant_from_livekit).collect(),
424 }
425 }
426 livekit::RoomEvent::Connected {
427 participants_with_tracks,
428 } => RoomEvent::Connected {
429 participants_with_tracks: participants_with_tracks
430 .into_iter()
431 .map({
432 |(p, t)| {
433 (
434 RemoteParticipant(p),
435 t.into_iter().map(RemoteTrackPublication).collect(),
436 )
437 }
438 })
439 .collect(),
440 },
441 livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
442 reason: reason.as_str_name(),
443 },
444 livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
445 livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
446 _ => {
447 log::trace!("dropping livekit event: {:?}", event);
448 return None;
449 }
450 };
451
452 Some(event)
453}