1use std::sync::Arc;
2
3use anyhow::Result;
4use collections::HashMap;
5use futures::{SinkExt, channel::mpsc};
6use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
7use gpui_tokio::Tokio;
8use playback::capture_local_video_track;
9
10mod playback;
11
12use crate::{LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication};
13pub use playback::AudioStream;
14pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
15
16#[derive(Clone, Debug)]
17pub struct RemoteVideoTrack(livekit::track::RemoteVideoTrack);
18#[derive(Clone, Debug)]
19pub struct RemoteAudioTrack(livekit::track::RemoteAudioTrack);
20#[derive(Clone, Debug)]
21pub struct RemoteTrackPublication(livekit::publication::RemoteTrackPublication);
22#[derive(Clone, Debug)]
23pub struct RemoteParticipant(livekit::participant::RemoteParticipant);
24
25#[derive(Clone, Debug)]
26pub struct LocalVideoTrack(livekit::track::LocalVideoTrack);
27#[derive(Clone, Debug)]
28pub struct LocalAudioTrack(livekit::track::LocalAudioTrack);
29#[derive(Clone, Debug)]
30pub struct LocalTrackPublication(livekit::publication::LocalTrackPublication);
31#[derive(Clone, Debug)]
32pub struct LocalParticipant(livekit::participant::LocalParticipant);
33
34pub struct Room {
35 room: livekit::Room,
36 _task: Task<()>,
37 playback: playback::AudioStack,
38}
39
40pub type TrackSid = livekit::id::TrackSid;
41pub type ConnectionState = livekit::ConnectionState;
42#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
43pub struct ParticipantIdentity(pub String);
44
45impl Room {
46 pub async fn connect(
47 url: String,
48 token: String,
49 cx: &mut AsyncApp,
50 ) -> Result<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
51 let connector =
52 tokio_tungstenite::Connector::Rustls(Arc::new(http_client_tls::tls_config()));
53 let mut config = livekit::RoomOptions::default();
54 config.connector = Some(connector);
55 let (room, mut events) = Tokio::spawn(cx, async move {
56 livekit::Room::connect(&url, &token, config).await
57 })?
58 .await??;
59
60 let (mut tx, rx) = mpsc::unbounded();
61 let task = cx.background_executor().spawn(async move {
62 while let Some(event) = events.recv().await {
63 if let Some(event) = room_event_from_livekit(event) {
64 tx.send(event.into()).await.ok();
65 }
66 }
67 });
68
69 Ok((
70 Self {
71 room,
72 _task: task,
73 playback: playback::AudioStack::new(cx.background_executor().clone()),
74 },
75 rx,
76 ))
77 }
78
79 pub fn local_participant(&self) -> LocalParticipant {
80 LocalParticipant(self.room.local_participant())
81 }
82
83 pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
84 self.room
85 .remote_participants()
86 .into_iter()
87 .map(|(k, v)| (ParticipantIdentity(k.0), RemoteParticipant(v)))
88 .collect()
89 }
90
91 pub fn connection_state(&self) -> ConnectionState {
92 self.room.connection_state()
93 }
94
95 pub async fn publish_local_microphone_track(
96 &self,
97 cx: &mut AsyncApp,
98 ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
99 let (track, stream) = self.playback.capture_local_microphone_track()?;
100 let publication = self
101 .local_participant()
102 .publish_track(
103 livekit::track::LocalTrack::Audio(track.0),
104 livekit::options::TrackPublishOptions {
105 source: livekit::track::TrackSource::Microphone,
106 ..Default::default()
107 },
108 cx,
109 )
110 .await?;
111
112 Ok((publication, stream))
113 }
114
115 // pub async fn publish_local_wav_track(
116 // &self,
117 // cx: &mut AsyncApp,
118 // ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
119 // let apm = self.apm.clone();
120 // let executor = cx.background_executor().clone();
121 // let (track, stream) =
122 // Tokio::spawn(
123 // cx,
124 // async move { capture_local_wav_track(apm, &executor).await },
125 // )?
126 // .await??;
127 // let publication = self
128 // .local_participant()
129 // .publish_track(
130 // livekit::track::LocalTrack::Audio(track.0),
131 // livekit::options::TrackPublishOptions {
132 // source: livekit::track::TrackSource::Microphone,
133 // ..Default::default()
134 // },
135 // cx,
136 // )
137 // .await?;
138
139 // Ok((publication, stream))
140 // }
141
142 pub async fn unpublish_local_track(
143 &self,
144 sid: TrackSid,
145 cx: &mut AsyncApp,
146 ) -> Result<LocalTrackPublication> {
147 self.local_participant().unpublish_track(sid, cx).await
148 }
149
150 pub fn play_remote_audio_track(
151 &self,
152 track: &RemoteAudioTrack,
153 _cx: &App,
154 ) -> Result<playback::AudioStream> {
155 Ok(self.playback.play_remote_audio_track(&track.0))
156 }
157}
158
159impl LocalParticipant {
160 pub async fn publish_screenshare_track(
161 &self,
162 source: &dyn ScreenCaptureSource,
163 cx: &mut AsyncApp,
164 ) -> Result<(LocalTrackPublication, Box<dyn ScreenCaptureStream>)> {
165 let (track, stream) = capture_local_video_track(&*source, cx).await?;
166 let options = livekit::options::TrackPublishOptions {
167 source: livekit::track::TrackSource::Screenshare,
168 video_codec: livekit::options::VideoCodec::VP8,
169 ..Default::default()
170 };
171 let publication = self
172 .publish_track(livekit::track::LocalTrack::Video(track.0), options, cx)
173 .await?;
174
175 Ok((publication, stream))
176 }
177
178 async fn publish_track(
179 &self,
180 track: livekit::track::LocalTrack,
181 options: livekit::options::TrackPublishOptions,
182 cx: &mut AsyncApp,
183 ) -> Result<LocalTrackPublication> {
184 let participant = self.0.clone();
185 Tokio::spawn(cx, async move {
186 participant.publish_track(track, options).await
187 })?
188 .await?
189 .map(|p| LocalTrackPublication(p))
190 .map_err(|error| anyhow::anyhow!("failed to publish track: {error}"))
191 }
192
193 pub async fn unpublish_track(
194 &self,
195 sid: TrackSid,
196 cx: &mut AsyncApp,
197 ) -> Result<LocalTrackPublication> {
198 let participant = self.0.clone();
199 Tokio::spawn(cx, async move { participant.unpublish_track(&sid).await })?
200 .await?
201 .map(|p| LocalTrackPublication(p))
202 .map_err(|error| anyhow::anyhow!("failed to unpublish track: {error}"))
203 }
204}
205
206impl LocalTrackPublication {
207 pub fn mute(&self, cx: &App) {
208 let track = self.0.clone();
209 Tokio::spawn(cx, async move {
210 track.mute();
211 })
212 .detach();
213 }
214
215 pub fn unmute(&self, cx: &App) {
216 let track = self.0.clone();
217 Tokio::spawn(cx, async move {
218 track.unmute();
219 })
220 .detach();
221 }
222
223 pub fn sid(&self) -> TrackSid {
224 self.0.sid()
225 }
226
227 pub fn is_muted(&self) -> bool {
228 self.0.is_muted()
229 }
230}
231
232impl RemoteParticipant {
233 pub fn identity(&self) -> ParticipantIdentity {
234 ParticipantIdentity(self.0.identity().0)
235 }
236
237 pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
238 self.0
239 .track_publications()
240 .into_iter()
241 .map(|(sid, publication)| (sid, RemoteTrackPublication(publication)))
242 .collect()
243 }
244}
245
246impl RemoteAudioTrack {
247 pub fn sid(&self) -> TrackSid {
248 self.0.sid()
249 }
250}
251
252impl RemoteVideoTrack {
253 pub fn sid(&self) -> TrackSid {
254 self.0.sid()
255 }
256}
257
258impl RemoteTrackPublication {
259 pub fn is_muted(&self) -> bool {
260 self.0.is_muted()
261 }
262
263 pub fn is_enabled(&self) -> bool {
264 self.0.is_enabled()
265 }
266
267 pub fn track(&self) -> Option<RemoteTrack> {
268 self.0.track().map(remote_track_from_livekit)
269 }
270
271 pub fn is_audio(&self) -> bool {
272 self.0.kind() == livekit::track::TrackKind::Audio
273 }
274
275 pub fn set_enabled(&self, enabled: bool, cx: &App) {
276 let track = self.0.clone();
277 Tokio::spawn(cx, async move { track.set_enabled(enabled) }).detach();
278 }
279
280 pub fn sid(&self) -> TrackSid {
281 self.0.sid()
282 }
283}
284
285impl RemoteTrack {
286 pub fn set_enabled(&self, enabled: bool, cx: &App) {
287 let this = self.clone();
288 Tokio::spawn(cx, async move {
289 match this {
290 RemoteTrack::Audio(remote_audio_track) => {
291 remote_audio_track.0.rtc_track().set_enabled(enabled)
292 }
293 RemoteTrack::Video(remote_video_track) => {
294 remote_video_track.0.rtc_track().set_enabled(enabled)
295 }
296 }
297 })
298 .detach();
299 }
300}
301
302impl Participant {
303 pub fn identity(&self) -> ParticipantIdentity {
304 match self {
305 Participant::Local(local_participant) => {
306 ParticipantIdentity(local_participant.0.identity().0)
307 }
308 Participant::Remote(remote_participant) => {
309 ParticipantIdentity(remote_participant.0.identity().0)
310 }
311 }
312 }
313}
314
315fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
316 match participant {
317 livekit::participant::Participant::Local(local) => {
318 Participant::Local(LocalParticipant(local))
319 }
320 livekit::participant::Participant::Remote(remote) => {
321 Participant::Remote(RemoteParticipant(remote))
322 }
323 }
324}
325
326fn publication_from_livekit(
327 publication: livekit::publication::TrackPublication,
328) -> TrackPublication {
329 match publication {
330 livekit::publication::TrackPublication::Local(local) => {
331 TrackPublication::Local(LocalTrackPublication(local))
332 }
333 livekit::publication::TrackPublication::Remote(remote) => {
334 TrackPublication::Remote(RemoteTrackPublication(remote))
335 }
336 }
337}
338
339fn remote_track_from_livekit(track: livekit::track::RemoteTrack) -> RemoteTrack {
340 match track {
341 livekit::track::RemoteTrack::Audio(audio) => RemoteTrack::Audio(RemoteAudioTrack(audio)),
342 livekit::track::RemoteTrack::Video(video) => RemoteTrack::Video(RemoteVideoTrack(video)),
343 }
344}
345
346fn local_track_from_livekit(track: livekit::track::LocalTrack) -> LocalTrack {
347 match track {
348 livekit::track::LocalTrack::Audio(audio) => LocalTrack::Audio(LocalAudioTrack(audio)),
349 livekit::track::LocalTrack::Video(video) => LocalTrack::Video(LocalVideoTrack(video)),
350 }
351}
352fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
353 let event = match event {
354 livekit::RoomEvent::ParticipantConnected(remote_participant) => {
355 RoomEvent::ParticipantConnected(RemoteParticipant(remote_participant))
356 }
357 livekit::RoomEvent::ParticipantDisconnected(remote_participant) => {
358 RoomEvent::ParticipantDisconnected(RemoteParticipant(remote_participant))
359 }
360 livekit::RoomEvent::LocalTrackPublished {
361 publication,
362 track,
363 participant,
364 } => RoomEvent::LocalTrackPublished {
365 publication: LocalTrackPublication(publication),
366 track: local_track_from_livekit(track),
367 participant: LocalParticipant(participant),
368 },
369 livekit::RoomEvent::LocalTrackUnpublished {
370 publication,
371 participant,
372 } => RoomEvent::LocalTrackUnpublished {
373 publication: LocalTrackPublication(publication),
374 participant: LocalParticipant(participant),
375 },
376 livekit::RoomEvent::LocalTrackSubscribed { track } => RoomEvent::LocalTrackSubscribed {
377 track: local_track_from_livekit(track),
378 },
379 livekit::RoomEvent::TrackSubscribed {
380 track,
381 publication,
382 participant,
383 } => RoomEvent::TrackSubscribed {
384 track: remote_track_from_livekit(track),
385 publication: RemoteTrackPublication(publication),
386 participant: RemoteParticipant(participant),
387 },
388 livekit::RoomEvent::TrackUnsubscribed {
389 track,
390 publication,
391 participant,
392 } => RoomEvent::TrackUnsubscribed {
393 track: remote_track_from_livekit(track),
394 publication: RemoteTrackPublication(publication),
395 participant: RemoteParticipant(participant),
396 },
397 livekit::RoomEvent::TrackSubscriptionFailed {
398 participant,
399 error: _,
400 track_sid,
401 } => RoomEvent::TrackSubscriptionFailed {
402 participant: RemoteParticipant(participant),
403 track_sid,
404 },
405 livekit::RoomEvent::TrackPublished {
406 publication,
407 participant,
408 } => RoomEvent::TrackPublished {
409 publication: RemoteTrackPublication(publication),
410 participant: RemoteParticipant(participant),
411 },
412 livekit::RoomEvent::TrackUnpublished {
413 publication,
414 participant,
415 } => RoomEvent::TrackUnpublished {
416 publication: RemoteTrackPublication(publication),
417 participant: RemoteParticipant(participant),
418 },
419 livekit::RoomEvent::TrackMuted {
420 participant,
421 publication,
422 } => RoomEvent::TrackMuted {
423 publication: publication_from_livekit(publication),
424 participant: participant_from_livekit(participant),
425 },
426 livekit::RoomEvent::TrackUnmuted {
427 participant,
428 publication,
429 } => RoomEvent::TrackUnmuted {
430 publication: publication_from_livekit(publication),
431 participant: participant_from_livekit(participant),
432 },
433 livekit::RoomEvent::RoomMetadataChanged {
434 old_metadata,
435 metadata,
436 } => RoomEvent::RoomMetadataChanged {
437 old_metadata,
438 metadata,
439 },
440 livekit::RoomEvent::ParticipantMetadataChanged {
441 participant,
442 old_metadata,
443 metadata,
444 } => RoomEvent::ParticipantMetadataChanged {
445 participant: participant_from_livekit(participant),
446 old_metadata,
447 metadata,
448 },
449 livekit::RoomEvent::ParticipantNameChanged {
450 participant,
451 old_name,
452 name,
453 } => RoomEvent::ParticipantNameChanged {
454 participant: participant_from_livekit(participant),
455 old_name,
456 name,
457 },
458 livekit::RoomEvent::ParticipantAttributesChanged {
459 participant,
460 changed_attributes,
461 } => RoomEvent::ParticipantAttributesChanged {
462 participant: participant_from_livekit(participant),
463 changed_attributes: changed_attributes.into_iter().collect(),
464 },
465 livekit::RoomEvent::ActiveSpeakersChanged { speakers } => {
466 RoomEvent::ActiveSpeakersChanged {
467 speakers: speakers.into_iter().map(participant_from_livekit).collect(),
468 }
469 }
470 livekit::RoomEvent::Connected {
471 participants_with_tracks,
472 } => RoomEvent::Connected {
473 participants_with_tracks: participants_with_tracks
474 .into_iter()
475 .map({
476 |(p, t)| {
477 (
478 RemoteParticipant(p),
479 t.into_iter().map(|t| RemoteTrackPublication(t)).collect(),
480 )
481 }
482 })
483 .collect(),
484 },
485 livekit::RoomEvent::Disconnected { reason } => RoomEvent::Disconnected {
486 reason: reason.as_str_name(),
487 },
488 livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
489 livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
490 _ => {
491 log::trace!("dropping livekit event: {:?}", event);
492 return None;
493 }
494 };
495
496 Some(event)
497}