1use anyhow::{anyhow, Context, Result};
2use core_foundation::{
3 array::{CFArray, CFArrayRef},
4 base::{CFRelease, CFRetain, TCFType},
5 string::{CFString, CFStringRef},
6};
7use futures::{
8 channel::{mpsc, oneshot},
9 Future,
10};
11pub use media::core_video::CVImageBuffer;
12use media::core_video::CVImageBufferRef;
13use parking_lot::Mutex;
14use postage::watch;
15use std::{
16 ffi::c_void,
17 sync::{Arc, Weak},
18};
19
20extern "C" {
21 fn LKRoomDelegateCreate(
22 callback_data: *mut c_void,
23 on_did_disconnect: extern "C" fn(callback_data: *mut c_void),
24 on_did_subscribe_to_remote_audio_track: extern "C" fn(
25 callback_data: *mut c_void,
26 publisher_id: CFStringRef,
27 track_id: CFStringRef,
28 remote_track: *const c_void,
29 ),
30 on_did_unsubscribe_from_remote_audio_track: extern "C" fn(
31 callback_data: *mut c_void,
32 publisher_id: CFStringRef,
33 track_id: CFStringRef,
34 ),
35 on_mute_changed_from_remote_audio_track: extern "C" fn(
36 callback_data: *mut c_void,
37 track_id: CFStringRef,
38 muted: bool,
39 ),
40 on_active_speakers_changed: extern "C" fn(
41 callback_data: *mut c_void,
42 participants: CFArrayRef,
43 ),
44 on_did_subscribe_to_remote_video_track: extern "C" fn(
45 callback_data: *mut c_void,
46 publisher_id: CFStringRef,
47 track_id: CFStringRef,
48 remote_track: *const c_void,
49 ),
50 on_did_unsubscribe_from_remote_video_track: extern "C" fn(
51 callback_data: *mut c_void,
52 publisher_id: CFStringRef,
53 track_id: CFStringRef,
54 ),
55 ) -> *const c_void;
56
57 fn LKRoomCreate(delegate: *const c_void) -> *const c_void;
58 fn LKRoomConnect(
59 room: *const c_void,
60 url: CFStringRef,
61 token: CFStringRef,
62 callback: extern "C" fn(*mut c_void, CFStringRef),
63 callback_data: *mut c_void,
64 );
65 fn LKRoomDisconnect(room: *const c_void);
66 fn LKRoomPublishVideoTrack(
67 room: *const c_void,
68 track: *const c_void,
69 callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
70 callback_data: *mut c_void,
71 );
72 fn LKRoomPublishAudioTrack(
73 room: *const c_void,
74 track: *const c_void,
75 callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
76 callback_data: *mut c_void,
77 );
78 fn LKRoomUnpublishTrack(room: *const c_void, publication: *const c_void);
79 fn LKRoomAudioTracksForRemoteParticipant(
80 room: *const c_void,
81 participant_id: CFStringRef,
82 ) -> CFArrayRef;
83
84 fn LKRoomAudioTrackPublicationsForRemoteParticipant(
85 room: *const c_void,
86 participant_id: CFStringRef,
87 ) -> CFArrayRef;
88
89 fn LKRoomVideoTracksForRemoteParticipant(
90 room: *const c_void,
91 participant_id: CFStringRef,
92 ) -> CFArrayRef;
93
94 fn LKVideoRendererCreate(
95 callback_data: *mut c_void,
96 on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool,
97 on_drop: extern "C" fn(callback_data: *mut c_void),
98 ) -> *const c_void;
99
100 fn LKRemoteAudioTrackGetSid(track: *const c_void) -> CFStringRef;
101 fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
102 fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef;
103
104 fn LKDisplaySources(
105 callback_data: *mut c_void,
106 callback: extern "C" fn(
107 callback_data: *mut c_void,
108 sources: CFArrayRef,
109 error: CFStringRef,
110 ),
111 );
112 fn LKCreateScreenShareTrackForDisplay(display: *const c_void) -> *const c_void;
113 fn LKLocalAudioTrackCreateTrack() -> *const c_void;
114
115 fn LKLocalTrackPublicationSetMute(
116 publication: *const c_void,
117 muted: bool,
118 on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef),
119 callback_data: *mut c_void,
120 );
121
122 fn LKRemoteTrackPublicationSetEnabled(
123 publication: *const c_void,
124 enabled: bool,
125 on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef),
126 callback_data: *mut c_void,
127 );
128}
129
130pub type Sid = String;
131
132#[derive(Clone, Eq, PartialEq)]
133pub enum ConnectionState {
134 Disconnected,
135 Connected { url: String, token: String },
136}
137
138pub struct Room {
139 native_room: *const c_void,
140 connection: Mutex<(
141 watch::Sender<ConnectionState>,
142 watch::Receiver<ConnectionState>,
143 )>,
144 remote_audio_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteAudioTrackUpdate>>>,
145 remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
146 _delegate: RoomDelegate,
147}
148
149impl Room {
150 pub fn new() -> Arc<Self> {
151 Arc::new_cyclic(|weak_room| {
152 let delegate = RoomDelegate::new(weak_room.clone());
153 Self {
154 native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
155 connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)),
156 remote_audio_track_subscribers: Default::default(),
157 remote_video_track_subscribers: Default::default(),
158 _delegate: delegate,
159 }
160 })
161 }
162
163 pub fn status(&self) -> watch::Receiver<ConnectionState> {
164 self.connection.lock().1.clone()
165 }
166
167 pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
168 let url = CFString::new(url);
169 let token = CFString::new(token);
170 let (did_connect, tx, rx) = Self::build_done_callback();
171 unsafe {
172 LKRoomConnect(
173 self.native_room,
174 url.as_concrete_TypeRef(),
175 token.as_concrete_TypeRef(),
176 did_connect,
177 tx,
178 )
179 }
180
181 let this = self.clone();
182 let url = url.to_string();
183 let token = token.to_string();
184 async move {
185 rx.await.unwrap().context("error connecting to room")?;
186 *this.connection.lock().0.borrow_mut() = ConnectionState::Connected { url, token };
187 Ok(())
188 }
189 }
190
191 fn did_disconnect(&self) {
192 *self.connection.lock().0.borrow_mut() = ConnectionState::Disconnected;
193 }
194
195 pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
196 extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
197 unsafe {
198 let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
199
200 if sources.is_null() {
201 let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
202 } else {
203 let sources = CFArray::wrap_under_get_rule(sources)
204 .into_iter()
205 .map(|source| MacOSDisplay::new(*source))
206 .collect();
207
208 let _ = tx.send(Ok(sources));
209 }
210 }
211 }
212
213 let (tx, rx) = oneshot::channel();
214
215 unsafe {
216 LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
217 }
218
219 async move { rx.await.unwrap() }
220 }
221
222 pub fn publish_video_track(
223 self: &Arc<Self>,
224 track: &LocalVideoTrack,
225 ) -> impl Future<Output = Result<LocalTrackPublication>> {
226 let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
227 extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
228 let tx =
229 unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
230 if error.is_null() {
231 let _ = tx.send(Ok(LocalTrackPublication::new(publication)));
232 } else {
233 let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
234 let _ = tx.send(Err(anyhow!(error)));
235 }
236 }
237 unsafe {
238 LKRoomPublishVideoTrack(
239 self.native_room,
240 track.0,
241 callback,
242 Box::into_raw(Box::new(tx)) as *mut c_void,
243 );
244 }
245 async { rx.await.unwrap().context("error publishing video track") }
246 }
247
248 pub fn publish_audio_track(
249 self: &Arc<Self>,
250 track: &LocalAudioTrack,
251 ) -> impl Future<Output = Result<LocalTrackPublication>> {
252 let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
253 extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
254 let tx =
255 unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
256 if error.is_null() {
257 let _ = tx.send(Ok(LocalTrackPublication::new(publication)));
258 } else {
259 let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
260 let _ = tx.send(Err(anyhow!(error)));
261 }
262 }
263 unsafe {
264 LKRoomPublishAudioTrack(
265 self.native_room,
266 track.0,
267 callback,
268 Box::into_raw(Box::new(tx)) as *mut c_void,
269 );
270 }
271 async { rx.await.unwrap().context("error publishing audio track") }
272 }
273
274 pub fn unpublish_track(&self, publication: LocalTrackPublication) {
275 unsafe {
276 LKRoomUnpublishTrack(self.native_room, publication.0);
277 }
278 }
279
280 pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
281 unsafe {
282 let tracks = LKRoomVideoTracksForRemoteParticipant(
283 self.native_room,
284 CFString::new(participant_id).as_concrete_TypeRef(),
285 );
286
287 if tracks.is_null() {
288 Vec::new()
289 } else {
290 let tracks = CFArray::wrap_under_get_rule(tracks);
291 tracks
292 .into_iter()
293 .map(|native_track| {
294 let native_track = *native_track;
295 let id =
296 CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
297 .to_string();
298 Arc::new(RemoteVideoTrack::new(
299 native_track,
300 id,
301 participant_id.into(),
302 ))
303 })
304 .collect()
305 }
306 }
307 }
308
309 pub fn remote_audio_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
310 unsafe {
311 let tracks = LKRoomAudioTracksForRemoteParticipant(
312 self.native_room,
313 CFString::new(participant_id).as_concrete_TypeRef(),
314 );
315
316 if tracks.is_null() {
317 Vec::new()
318 } else {
319 let tracks = CFArray::wrap_under_get_rule(tracks);
320 tracks
321 .into_iter()
322 .map(|native_track| {
323 let native_track = *native_track;
324 let id =
325 CFString::wrap_under_get_rule(LKRemoteAudioTrackGetSid(native_track))
326 .to_string();
327 Arc::new(RemoteAudioTrack::new(
328 native_track,
329 id,
330 participant_id.into(),
331 ))
332 })
333 .collect()
334 }
335 }
336 }
337
338 pub fn remote_audio_track_publications(
339 &self,
340 participant_id: &str,
341 ) -> Vec<Arc<RemoteTrackPublication>> {
342 unsafe {
343 let tracks = LKRoomAudioTrackPublicationsForRemoteParticipant(
344 self.native_room,
345 CFString::new(participant_id).as_concrete_TypeRef(),
346 );
347
348 if tracks.is_null() {
349 Vec::new()
350 } else {
351 let tracks = CFArray::wrap_under_get_rule(tracks);
352 tracks
353 .into_iter()
354 .map(|native_track_publication| {
355 let native_track_publication = *native_track_publication;
356 Arc::new(RemoteTrackPublication::new(native_track_publication))
357 })
358 .collect()
359 }
360 }
361 }
362
363 pub fn remote_audio_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteAudioTrackUpdate> {
364 let (tx, rx) = mpsc::unbounded();
365 self.remote_audio_track_subscribers.lock().push(tx);
366 rx
367 }
368
369 pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
370 let (tx, rx) = mpsc::unbounded();
371 self.remote_video_track_subscribers.lock().push(tx);
372 rx
373 }
374
375 fn did_subscribe_to_remote_audio_track(&self, track: RemoteAudioTrack) {
376 let track = Arc::new(track);
377 self.remote_audio_track_subscribers.lock().retain(|tx| {
378 tx.unbounded_send(RemoteAudioTrackUpdate::Subscribed(track.clone()))
379 .is_ok()
380 });
381 }
382
383 fn did_unsubscribe_from_remote_audio_track(&self, publisher_id: String, track_id: String) {
384 self.remote_audio_track_subscribers.lock().retain(|tx| {
385 tx.unbounded_send(RemoteAudioTrackUpdate::Unsubscribed {
386 publisher_id: publisher_id.clone(),
387 track_id: track_id.clone(),
388 })
389 .is_ok()
390 });
391 }
392
393 fn mute_changed_from_remote_audio_track(&self, track_id: String, muted: bool) {
394 self.remote_audio_track_subscribers.lock().retain(|tx| {
395 tx.unbounded_send(RemoteAudioTrackUpdate::MuteChanged {
396 track_id: track_id.clone(),
397 muted,
398 })
399 .is_ok()
400 });
401 }
402
403 // A vec of publisher IDs
404 fn active_speakers_changed(&self, speakers: Vec<String>) {
405 self.remote_audio_track_subscribers
406 .lock()
407 .retain(move |tx| {
408 tx.unbounded_send(RemoteAudioTrackUpdate::ActiveSpeakersChanged {
409 speakers: speakers.clone(),
410 })
411 .is_ok()
412 });
413 }
414
415 fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
416 let track = Arc::new(track);
417 self.remote_video_track_subscribers.lock().retain(|tx| {
418 tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
419 .is_ok()
420 });
421 }
422
423 fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
424 self.remote_video_track_subscribers.lock().retain(|tx| {
425 tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
426 publisher_id: publisher_id.clone(),
427 track_id: track_id.clone(),
428 })
429 .is_ok()
430 });
431 }
432
433 fn build_done_callback() -> (
434 extern "C" fn(*mut c_void, CFStringRef),
435 *mut c_void,
436 oneshot::Receiver<Result<()>>,
437 ) {
438 let (tx, rx) = oneshot::channel();
439 extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
440 let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
441 if error.is_null() {
442 let _ = tx.send(Ok(()));
443 } else {
444 let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
445 let _ = tx.send(Err(anyhow!(error)));
446 }
447 }
448 (
449 done_callback,
450 Box::into_raw(Box::new(tx)) as *mut c_void,
451 rx,
452 )
453 }
454}
455
456impl Drop for Room {
457 fn drop(&mut self) {
458 unsafe {
459 LKRoomDisconnect(self.native_room);
460 CFRelease(self.native_room);
461 }
462 }
463}
464
465struct RoomDelegate {
466 native_delegate: *const c_void,
467 weak_room: *const Room,
468}
469
470impl RoomDelegate {
471 fn new(weak_room: Weak<Room>) -> Self {
472 let weak_room = Weak::into_raw(weak_room);
473 let native_delegate = unsafe {
474 LKRoomDelegateCreate(
475 weak_room as *mut c_void,
476 Self::on_did_disconnect,
477 Self::on_did_subscribe_to_remote_audio_track,
478 Self::on_did_unsubscribe_from_remote_audio_track,
479 Self::on_mute_change_from_remote_audio_track,
480 Self::on_active_speakers_changed,
481 Self::on_did_subscribe_to_remote_video_track,
482 Self::on_did_unsubscribe_from_remote_video_track,
483 )
484 };
485 Self {
486 native_delegate,
487 weak_room,
488 }
489 }
490
491 extern "C" fn on_did_disconnect(room: *mut c_void) {
492 let room = unsafe { Weak::from_raw(room as *mut Room) };
493 if let Some(room) = room.upgrade() {
494 room.did_disconnect();
495 }
496 let _ = Weak::into_raw(room);
497 }
498
499 extern "C" fn on_did_subscribe_to_remote_audio_track(
500 room: *mut c_void,
501 publisher_id: CFStringRef,
502 track_id: CFStringRef,
503 track: *const c_void,
504 ) {
505 let room = unsafe { Weak::from_raw(room as *mut Room) };
506 let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
507 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
508 let track = RemoteAudioTrack::new(track, track_id, publisher_id);
509 if let Some(room) = room.upgrade() {
510 room.did_subscribe_to_remote_audio_track(track);
511 }
512 let _ = Weak::into_raw(room);
513 }
514
515 extern "C" fn on_did_unsubscribe_from_remote_audio_track(
516 room: *mut c_void,
517 publisher_id: CFStringRef,
518 track_id: CFStringRef,
519 ) {
520 let room = unsafe { Weak::from_raw(room as *mut Room) };
521 let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
522 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
523 if let Some(room) = room.upgrade() {
524 room.did_unsubscribe_from_remote_audio_track(publisher_id, track_id);
525 }
526 let _ = Weak::into_raw(room);
527 }
528
529 extern "C" fn on_mute_change_from_remote_audio_track(
530 room: *mut c_void,
531 track_id: CFStringRef,
532 muted: bool,
533 ) {
534 let room = unsafe { Weak::from_raw(room as *mut Room) };
535 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
536 if let Some(room) = room.upgrade() {
537 room.mute_changed_from_remote_audio_track(track_id, muted);
538 }
539 let _ = Weak::into_raw(room);
540 }
541
542 extern "C" fn on_active_speakers_changed(room: *mut c_void, participants: CFArrayRef) {
543 if participants.is_null() {
544 return;
545 }
546
547 let room = unsafe { Weak::from_raw(room as *mut Room) };
548 let speakers = unsafe {
549 CFArray::wrap_under_get_rule(participants)
550 .into_iter()
551 .map(
552 |speaker: core_foundation::base::ItemRef<'_, *const c_void>| {
553 CFString::wrap_under_get_rule(*speaker as CFStringRef).to_string()
554 },
555 )
556 .collect()
557 };
558
559 if let Some(room) = room.upgrade() {
560 room.active_speakers_changed(speakers);
561 }
562 let _ = Weak::into_raw(room);
563 }
564
565 extern "C" fn on_did_subscribe_to_remote_video_track(
566 room: *mut c_void,
567 publisher_id: CFStringRef,
568 track_id: CFStringRef,
569 track: *const c_void,
570 ) {
571 let room = unsafe { Weak::from_raw(room as *mut Room) };
572 let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
573 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
574 let track = RemoteVideoTrack::new(track, track_id, publisher_id);
575 if let Some(room) = room.upgrade() {
576 room.did_subscribe_to_remote_video_track(track);
577 }
578 let _ = Weak::into_raw(room);
579 }
580
581 extern "C" fn on_did_unsubscribe_from_remote_video_track(
582 room: *mut c_void,
583 publisher_id: CFStringRef,
584 track_id: CFStringRef,
585 ) {
586 let room = unsafe { Weak::from_raw(room as *mut Room) };
587 let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
588 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
589 if let Some(room) = room.upgrade() {
590 room.did_unsubscribe_from_remote_video_track(publisher_id, track_id);
591 }
592 let _ = Weak::into_raw(room);
593 }
594}
595
596impl Drop for RoomDelegate {
597 fn drop(&mut self) {
598 unsafe {
599 CFRelease(self.native_delegate);
600 let _ = Weak::from_raw(self.weak_room);
601 }
602 }
603}
604
605pub struct LocalAudioTrack(*const c_void);
606
607impl LocalAudioTrack {
608 pub fn create() -> Self {
609 Self(unsafe { LKLocalAudioTrackCreateTrack() })
610 }
611}
612
613impl Drop for LocalAudioTrack {
614 fn drop(&mut self) {
615 unsafe { CFRelease(self.0) }
616 }
617}
618
619pub struct LocalVideoTrack(*const c_void);
620
621impl LocalVideoTrack {
622 pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
623 Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) })
624 }
625}
626
627impl Drop for LocalVideoTrack {
628 fn drop(&mut self) {
629 unsafe { CFRelease(self.0) }
630 }
631}
632
633pub struct LocalTrackPublication(*const c_void);
634
635impl LocalTrackPublication {
636 pub fn new(native_track_publication: *const c_void) -> Self {
637 unsafe {
638 CFRetain(native_track_publication);
639 }
640 Self(native_track_publication)
641 }
642
643 pub fn set_mute(&self, muted: bool) -> impl Future<Output = Result<()>> {
644 let (tx, rx) = futures::channel::oneshot::channel();
645
646 extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) {
647 let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender<Result<()>>) };
648 if error.is_null() {
649 tx.send(Ok(())).ok();
650 } else {
651 let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
652 tx.send(Err(anyhow!(error))).ok();
653 }
654 }
655
656 unsafe {
657 LKLocalTrackPublicationSetMute(
658 self.0,
659 muted,
660 complete_callback,
661 Box::into_raw(Box::new(tx)) as *mut c_void,
662 )
663 }
664
665 async move { rx.await.unwrap() }
666 }
667}
668
669impl Drop for LocalTrackPublication {
670 fn drop(&mut self) {
671 unsafe { CFRelease(self.0) }
672 }
673}
674
675pub struct RemoteTrackPublication(*const c_void);
676
677impl RemoteTrackPublication {
678 pub fn new(native_track_publication: *const c_void) -> Self {
679 unsafe {
680 CFRetain(native_track_publication);
681 }
682 Self(native_track_publication)
683 }
684
685 pub fn set_enabled(&self, enabled: bool) -> impl Future<Output = Result<()>> {
686 let (tx, rx) = futures::channel::oneshot::channel();
687
688 extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) {
689 let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender<Result<()>>) };
690 if error.is_null() {
691 tx.send(Ok(())).ok();
692 } else {
693 let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
694 tx.send(Err(anyhow!(error))).ok();
695 }
696 }
697
698 unsafe {
699 LKRemoteTrackPublicationSetEnabled(
700 self.0,
701 enabled,
702 complete_callback,
703 Box::into_raw(Box::new(tx)) as *mut c_void,
704 )
705 }
706
707 async move { rx.await.unwrap() }
708 }
709}
710
711impl Drop for RemoteTrackPublication {
712 fn drop(&mut self) {
713 unsafe { CFRelease(self.0) }
714 }
715}
716
717#[derive(Debug)]
718pub struct RemoteAudioTrack {
719 _native_track: *const c_void,
720 sid: Sid,
721 publisher_id: String,
722}
723
724impl RemoteAudioTrack {
725 fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
726 unsafe {
727 CFRetain(native_track);
728 }
729 Self {
730 _native_track: native_track,
731 sid,
732 publisher_id,
733 }
734 }
735
736 pub fn sid(&self) -> &str {
737 &self.sid
738 }
739
740 pub fn publisher_id(&self) -> &str {
741 &self.publisher_id
742 }
743
744 pub fn enable(&self) -> impl Future<Output = Result<()>> {
745 async { Ok(()) }
746 }
747
748 pub fn disable(&self) -> impl Future<Output = Result<()>> {
749 async { Ok(()) }
750 }
751}
752
753#[derive(Debug)]
754pub struct RemoteVideoTrack {
755 native_track: *const c_void,
756 sid: Sid,
757 publisher_id: String,
758}
759
760impl RemoteVideoTrack {
761 fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
762 unsafe {
763 CFRetain(native_track);
764 }
765 Self {
766 native_track,
767 sid,
768 publisher_id,
769 }
770 }
771
772 pub fn sid(&self) -> &str {
773 &self.sid
774 }
775
776 pub fn publisher_id(&self) -> &str {
777 &self.publisher_id
778 }
779
780 pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
781 extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool {
782 unsafe {
783 let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
784 let buffer = CVImageBuffer::wrap_under_get_rule(frame);
785 let result = tx.try_broadcast(Frame(buffer));
786 let _ = Box::into_raw(tx);
787 match result {
788 Ok(_) => true,
789 Err(async_broadcast::TrySendError::Closed(_))
790 | Err(async_broadcast::TrySendError::Inactive(_)) => {
791 log::warn!("no active receiver for frame");
792 false
793 }
794 Err(async_broadcast::TrySendError::Full(_)) => {
795 log::warn!("skipping frame as receiver is not keeping up");
796 true
797 }
798 }
799 }
800 }
801
802 extern "C" fn on_drop(callback_data: *mut c_void) {
803 unsafe {
804 let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
805 }
806 }
807
808 let (tx, rx) = async_broadcast::broadcast(64);
809 unsafe {
810 let renderer = LKVideoRendererCreate(
811 Box::into_raw(Box::new(tx)) as *mut c_void,
812 on_frame,
813 on_drop,
814 );
815 LKVideoTrackAddRenderer(self.native_track, renderer);
816 rx
817 }
818 }
819}
820
821impl Drop for RemoteVideoTrack {
822 fn drop(&mut self) {
823 unsafe { CFRelease(self.native_track) }
824 }
825}
826
827pub enum RemoteVideoTrackUpdate {
828 Subscribed(Arc<RemoteVideoTrack>),
829 Unsubscribed { publisher_id: Sid, track_id: Sid },
830}
831
832pub enum RemoteAudioTrackUpdate {
833 ActiveSpeakersChanged { speakers: Vec<Sid> },
834 MuteChanged { track_id: Sid, muted: bool },
835 Subscribed(Arc<RemoteAudioTrack>),
836 Unsubscribed { publisher_id: Sid, track_id: Sid },
837}
838
839pub struct MacOSDisplay(*const c_void);
840
841impl MacOSDisplay {
842 fn new(ptr: *const c_void) -> Self {
843 unsafe {
844 CFRetain(ptr);
845 }
846 Self(ptr)
847 }
848}
849
850impl Drop for MacOSDisplay {
851 fn drop(&mut self) {
852 unsafe { CFRelease(self.0) }
853 }
854}
855
856#[derive(Clone)]
857pub struct Frame(CVImageBuffer);
858
859impl Frame {
860 pub fn width(&self) -> usize {
861 self.0.width()
862 }
863
864 pub fn height(&self) -> usize {
865 self.0.height()
866 }
867
868 pub fn image(&self) -> CVImageBuffer {
869 self.0.clone()
870 }
871}