prod.rs

  1use anyhow::{anyhow, Context, Result};
  2use core_foundation::{
  3    array::{CFArray, CFArrayRef},
  4    base::{CFRelease, CFRetain, TCFType},
  5    string::{CFString, CFStringRef},
  6};
  7use futures::{
  8    channel::{mpsc, oneshot},
  9    Future,
 10};
 11pub use media::core_video::CVImageBuffer;
 12use media::core_video::CVImageBufferRef;
 13use parking_lot::Mutex;
 14use postage::watch;
 15use std::{
 16    ffi::c_void,
 17    sync::{Arc, Weak},
 18};
 19
 20extern "C" {
 21    fn LKRoomDelegateCreate(
 22        callback_data: *mut c_void,
 23        on_did_disconnect: extern "C" fn(callback_data: *mut c_void),
 24        on_did_subscribe_to_remote_audio_track: extern "C" fn(
 25            callback_data: *mut c_void,
 26            publisher_id: CFStringRef,
 27            track_id: CFStringRef,
 28            remote_track: *const c_void,
 29            remote_publication: *const c_void,
 30        ),
 31        on_did_unsubscribe_from_remote_audio_track: extern "C" fn(
 32            callback_data: *mut c_void,
 33            publisher_id: CFStringRef,
 34            track_id: CFStringRef,
 35        ),
 36        on_mute_changed_from_remote_audio_track: extern "C" fn(
 37            callback_data: *mut c_void,
 38            track_id: CFStringRef,
 39            muted: bool,
 40        ),
 41        on_active_speakers_changed: extern "C" fn(
 42            callback_data: *mut c_void,
 43            participants: CFArrayRef,
 44        ),
 45        on_did_subscribe_to_remote_video_track: extern "C" fn(
 46            callback_data: *mut c_void,
 47            publisher_id: CFStringRef,
 48            track_id: CFStringRef,
 49            remote_track: *const c_void,
 50        ),
 51        on_did_unsubscribe_from_remote_video_track: extern "C" fn(
 52            callback_data: *mut c_void,
 53            publisher_id: CFStringRef,
 54            track_id: CFStringRef,
 55        ),
 56    ) -> *const c_void;
 57
 58    fn LKRoomCreate(delegate: *const c_void) -> *const c_void;
 59    fn LKRoomConnect(
 60        room: *const c_void,
 61        url: CFStringRef,
 62        token: CFStringRef,
 63        callback: extern "C" fn(*mut c_void, CFStringRef),
 64        callback_data: *mut c_void,
 65    );
 66    fn LKRoomDisconnect(room: *const c_void);
 67    fn LKRoomPublishVideoTrack(
 68        room: *const c_void,
 69        track: *const c_void,
 70        callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
 71        callback_data: *mut c_void,
 72    );
 73    fn LKRoomPublishAudioTrack(
 74        room: *const c_void,
 75        track: *const c_void,
 76        callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
 77        callback_data: *mut c_void,
 78    );
 79    fn LKRoomUnpublishTrack(room: *const c_void, publication: *const c_void);
 80    fn LKRoomAudioTracksForRemoteParticipant(
 81        room: *const c_void,
 82        participant_id: CFStringRef,
 83    ) -> CFArrayRef;
 84
 85    fn LKRoomAudioTrackPublicationsForRemoteParticipant(
 86        room: *const c_void,
 87        participant_id: CFStringRef,
 88    ) -> CFArrayRef;
 89
 90    fn LKRoomVideoTracksForRemoteParticipant(
 91        room: *const c_void,
 92        participant_id: CFStringRef,
 93    ) -> CFArrayRef;
 94
 95    fn LKVideoRendererCreate(
 96        callback_data: *mut c_void,
 97        on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool,
 98        on_drop: extern "C" fn(callback_data: *mut c_void),
 99    ) -> *const c_void;
100
101    fn LKRemoteAudioTrackGetSid(track: *const c_void) -> CFStringRef;
102    fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
103    fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef;
104
105    fn LKDisplaySources(
106        callback_data: *mut c_void,
107        callback: extern "C" fn(
108            callback_data: *mut c_void,
109            sources: CFArrayRef,
110            error: CFStringRef,
111        ),
112    );
113    fn LKCreateScreenShareTrackForDisplay(display: *const c_void) -> *const c_void;
114    fn LKLocalAudioTrackCreateTrack() -> *const c_void;
115
116    fn LKLocalTrackPublicationSetMute(
117        publication: *const c_void,
118        muted: bool,
119        on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef),
120        callback_data: *mut c_void,
121    );
122
123    fn LKRemoteTrackPublicationSetEnabled(
124        publication: *const c_void,
125        enabled: bool,
126        on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef),
127        callback_data: *mut c_void,
128    );
129
130    fn LKRemoteTrackPublicationIsMuted(publication: *const c_void) -> bool;
131    fn LKRemoteTrackPublicationGetSid(publication: *const c_void) -> CFStringRef;
132}
133
134pub type Sid = String;
135
136#[derive(Clone, Eq, PartialEq)]
137pub enum ConnectionState {
138    Disconnected,
139    Connected { url: String, token: String },
140}
141
142pub struct Room {
143    native_room: *const c_void,
144    connection: Mutex<(
145        watch::Sender<ConnectionState>,
146        watch::Receiver<ConnectionState>,
147    )>,
148    remote_audio_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteAudioTrackUpdate>>>,
149    remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
150    _delegate: RoomDelegate,
151}
152
153// SAFETY: LiveKit objects are thread-safe: https://github.com/livekit/client-sdk-swift#thread-safety
154unsafe impl Send for Room {}
155unsafe impl Sync for Room {}
156
157impl Room {
158    pub fn new() -> Arc<Self> {
159        Arc::new_cyclic(|weak_room| {
160            let delegate = RoomDelegate::new(weak_room.clone());
161            Self {
162                native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
163                connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)),
164                remote_audio_track_subscribers: Default::default(),
165                remote_video_track_subscribers: Default::default(),
166                _delegate: delegate,
167            }
168        })
169    }
170
171    pub fn status(&self) -> watch::Receiver<ConnectionState> {
172        self.connection.lock().1.clone()
173    }
174
175    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
176        let url = CFString::new(url);
177        let token = CFString::new(token);
178        let (did_connect, tx, rx) = Self::build_done_callback();
179        unsafe {
180            LKRoomConnect(
181                self.native_room,
182                url.as_concrete_TypeRef(),
183                token.as_concrete_TypeRef(),
184                did_connect,
185                tx,
186            )
187        }
188
189        let this = self.clone();
190        let url = url.to_string();
191        let token = token.to_string();
192        async move {
193            rx.await.unwrap().context("error connecting to room")?;
194            *this.connection.lock().0.borrow_mut() = ConnectionState::Connected { url, token };
195            Ok(())
196        }
197    }
198
199    fn did_disconnect(&self) {
200        *self.connection.lock().0.borrow_mut() = ConnectionState::Disconnected;
201    }
202
203    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
204        extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
205            unsafe {
206                let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
207
208                if sources.is_null() {
209                    let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
210                } else {
211                    let sources = CFArray::wrap_under_get_rule(sources)
212                        .into_iter()
213                        .map(|source| MacOSDisplay::new(*source))
214                        .collect();
215
216                    let _ = tx.send(Ok(sources));
217                }
218            }
219        }
220
221        let (tx, rx) = oneshot::channel();
222
223        unsafe {
224            LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
225        }
226
227        async move { rx.await.unwrap() }
228    }
229
230    pub fn publish_video_track(
231        self: &Arc<Self>,
232        track: &LocalVideoTrack,
233    ) -> impl Future<Output = Result<LocalTrackPublication>> {
234        let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
235        extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
236            let tx =
237                unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
238            if error.is_null() {
239                let _ = tx.send(Ok(LocalTrackPublication::new(publication)));
240            } else {
241                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
242                let _ = tx.send(Err(anyhow!(error)));
243            }
244        }
245        unsafe {
246            LKRoomPublishVideoTrack(
247                self.native_room,
248                track.0,
249                callback,
250                Box::into_raw(Box::new(tx)) as *mut c_void,
251            );
252        }
253        async { rx.await.unwrap().context("error publishing video track") }
254    }
255
256    pub fn publish_audio_track(
257        self: &Arc<Self>,
258        track: &LocalAudioTrack,
259    ) -> impl Future<Output = Result<LocalTrackPublication>> {
260        let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
261        extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
262            let tx =
263                unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
264            if error.is_null() {
265                let _ = tx.send(Ok(LocalTrackPublication::new(publication)));
266            } else {
267                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
268                let _ = tx.send(Err(anyhow!(error)));
269            }
270        }
271        unsafe {
272            LKRoomPublishAudioTrack(
273                self.native_room,
274                track.0,
275                callback,
276                Box::into_raw(Box::new(tx)) as *mut c_void,
277            );
278        }
279        async { rx.await.unwrap().context("error publishing audio track") }
280    }
281
282    pub fn unpublish_track(&self, publication: LocalTrackPublication) {
283        unsafe {
284            LKRoomUnpublishTrack(self.native_room, publication.0);
285        }
286    }
287
288    pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
289        unsafe {
290            let tracks = LKRoomVideoTracksForRemoteParticipant(
291                self.native_room,
292                CFString::new(participant_id).as_concrete_TypeRef(),
293            );
294
295            if tracks.is_null() {
296                Vec::new()
297            } else {
298                let tracks = CFArray::wrap_under_get_rule(tracks);
299                tracks
300                    .into_iter()
301                    .map(|native_track| {
302                        let native_track = *native_track;
303                        let id =
304                            CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
305                                .to_string();
306                        Arc::new(RemoteVideoTrack::new(
307                            native_track,
308                            id,
309                            participant_id.into(),
310                        ))
311                    })
312                    .collect()
313            }
314        }
315    }
316
317    pub fn remote_audio_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
318        unsafe {
319            let tracks = LKRoomAudioTracksForRemoteParticipant(
320                self.native_room,
321                CFString::new(participant_id).as_concrete_TypeRef(),
322            );
323
324            if tracks.is_null() {
325                Vec::new()
326            } else {
327                let tracks = CFArray::wrap_under_get_rule(tracks);
328                tracks
329                    .into_iter()
330                    .map(|native_track| {
331                        let native_track = *native_track;
332                        let id =
333                            CFString::wrap_under_get_rule(LKRemoteAudioTrackGetSid(native_track))
334                                .to_string();
335                        Arc::new(RemoteAudioTrack::new(
336                            native_track,
337                            id,
338                            participant_id.into(),
339                        ))
340                    })
341                    .collect()
342            }
343        }
344    }
345
346    pub fn remote_audio_track_publications(
347        &self,
348        participant_id: &str,
349    ) -> Vec<Arc<RemoteTrackPublication>> {
350        unsafe {
351            let tracks = LKRoomAudioTrackPublicationsForRemoteParticipant(
352                self.native_room,
353                CFString::new(participant_id).as_concrete_TypeRef(),
354            );
355
356            if tracks.is_null() {
357                Vec::new()
358            } else {
359                let tracks = CFArray::wrap_under_get_rule(tracks);
360                tracks
361                    .into_iter()
362                    .map(|native_track_publication| {
363                        let native_track_publication = *native_track_publication;
364                        Arc::new(RemoteTrackPublication::new(native_track_publication))
365                    })
366                    .collect()
367            }
368        }
369    }
370
371    pub fn remote_audio_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteAudioTrackUpdate> {
372        let (tx, rx) = mpsc::unbounded();
373        self.remote_audio_track_subscribers.lock().push(tx);
374        rx
375    }
376
377    pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
378        let (tx, rx) = mpsc::unbounded();
379        self.remote_video_track_subscribers.lock().push(tx);
380        rx
381    }
382
383    fn did_subscribe_to_remote_audio_track(
384        &self,
385        track: RemoteAudioTrack,
386        publication: RemoteTrackPublication,
387    ) {
388        let track = Arc::new(track);
389        let publication = Arc::new(publication);
390        self.remote_audio_track_subscribers.lock().retain(|tx| {
391            tx.unbounded_send(RemoteAudioTrackUpdate::Subscribed(
392                track.clone(),
393                publication.clone(),
394            ))
395            .is_ok()
396        });
397    }
398
399    fn did_unsubscribe_from_remote_audio_track(&self, publisher_id: String, track_id: String) {
400        self.remote_audio_track_subscribers.lock().retain(|tx| {
401            tx.unbounded_send(RemoteAudioTrackUpdate::Unsubscribed {
402                publisher_id: publisher_id.clone(),
403                track_id: track_id.clone(),
404            })
405            .is_ok()
406        });
407    }
408
409    fn mute_changed_from_remote_audio_track(&self, track_id: String, muted: bool) {
410        self.remote_audio_track_subscribers.lock().retain(|tx| {
411            tx.unbounded_send(RemoteAudioTrackUpdate::MuteChanged {
412                track_id: track_id.clone(),
413                muted,
414            })
415            .is_ok()
416        });
417    }
418
419    // A vec of publisher IDs
420    fn active_speakers_changed(&self, speakers: Vec<String>) {
421        self.remote_audio_track_subscribers
422            .lock()
423            .retain(move |tx| {
424                tx.unbounded_send(RemoteAudioTrackUpdate::ActiveSpeakersChanged {
425                    speakers: speakers.clone(),
426                })
427                .is_ok()
428            });
429    }
430
431    fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
432        let track = Arc::new(track);
433        self.remote_video_track_subscribers.lock().retain(|tx| {
434            tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
435                .is_ok()
436        });
437    }
438
439    fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
440        self.remote_video_track_subscribers.lock().retain(|tx| {
441            tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
442                publisher_id: publisher_id.clone(),
443                track_id: track_id.clone(),
444            })
445            .is_ok()
446        });
447    }
448
449    fn build_done_callback() -> (
450        extern "C" fn(*mut c_void, CFStringRef),
451        *mut c_void,
452        oneshot::Receiver<Result<()>>,
453    ) {
454        let (tx, rx) = oneshot::channel();
455        extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
456            let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
457            if error.is_null() {
458                let _ = tx.send(Ok(()));
459            } else {
460                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
461                let _ = tx.send(Err(anyhow!(error)));
462            }
463        }
464        (
465            done_callback,
466            Box::into_raw(Box::new(tx)) as *mut c_void,
467            rx,
468        )
469    }
470}
471
472impl Drop for Room {
473    fn drop(&mut self) {
474        unsafe {
475            LKRoomDisconnect(self.native_room);
476            CFRelease(self.native_room);
477        }
478    }
479}
480
481struct RoomDelegate {
482    native_delegate: *const c_void,
483    weak_room: *const Room,
484}
485
486impl RoomDelegate {
487    fn new(weak_room: Weak<Room>) -> Self {
488        let weak_room = Weak::into_raw(weak_room);
489        let native_delegate = unsafe {
490            LKRoomDelegateCreate(
491                weak_room as *mut c_void,
492                Self::on_did_disconnect,
493                Self::on_did_subscribe_to_remote_audio_track,
494                Self::on_did_unsubscribe_from_remote_audio_track,
495                Self::on_mute_change_from_remote_audio_track,
496                Self::on_active_speakers_changed,
497                Self::on_did_subscribe_to_remote_video_track,
498                Self::on_did_unsubscribe_from_remote_video_track,
499            )
500        };
501        Self {
502            native_delegate,
503            weak_room,
504        }
505    }
506
507    extern "C" fn on_did_disconnect(room: *mut c_void) {
508        let room = unsafe { Weak::from_raw(room as *mut Room) };
509        if let Some(room) = room.upgrade() {
510            room.did_disconnect();
511        }
512        let _ = Weak::into_raw(room);
513    }
514
515    extern "C" fn on_did_subscribe_to_remote_audio_track(
516        room: *mut c_void,
517        publisher_id: CFStringRef,
518        track_id: CFStringRef,
519        track: *const c_void,
520        publication: *const c_void,
521    ) {
522        let room = unsafe { Weak::from_raw(room as *mut Room) };
523        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
524        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
525        let track = RemoteAudioTrack::new(track, track_id, publisher_id);
526        let publication = RemoteTrackPublication::new(publication);
527        if let Some(room) = room.upgrade() {
528            room.did_subscribe_to_remote_audio_track(track, publication);
529        }
530        let _ = Weak::into_raw(room);
531    }
532
533    extern "C" fn on_did_unsubscribe_from_remote_audio_track(
534        room: *mut c_void,
535        publisher_id: CFStringRef,
536        track_id: CFStringRef,
537    ) {
538        let room = unsafe { Weak::from_raw(room as *mut Room) };
539        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
540        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
541        if let Some(room) = room.upgrade() {
542            room.did_unsubscribe_from_remote_audio_track(publisher_id, track_id);
543        }
544        let _ = Weak::into_raw(room);
545    }
546
547    extern "C" fn on_mute_change_from_remote_audio_track(
548        room: *mut c_void,
549        track_id: CFStringRef,
550        muted: bool,
551    ) {
552        let room = unsafe { Weak::from_raw(room as *mut Room) };
553        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
554        if let Some(room) = room.upgrade() {
555            room.mute_changed_from_remote_audio_track(track_id, muted);
556        }
557        let _ = Weak::into_raw(room);
558    }
559
560    extern "C" fn on_active_speakers_changed(room: *mut c_void, participants: CFArrayRef) {
561        if participants.is_null() {
562            return;
563        }
564
565        let room = unsafe { Weak::from_raw(room as *mut Room) };
566        let speakers = unsafe {
567            CFArray::wrap_under_get_rule(participants)
568                .into_iter()
569                .map(
570                    |speaker: core_foundation::base::ItemRef<'_, *const c_void>| {
571                        CFString::wrap_under_get_rule(*speaker as CFStringRef).to_string()
572                    },
573                )
574                .collect()
575        };
576
577        if let Some(room) = room.upgrade() {
578            room.active_speakers_changed(speakers);
579        }
580        let _ = Weak::into_raw(room);
581    }
582
583    extern "C" fn on_did_subscribe_to_remote_video_track(
584        room: *mut c_void,
585        publisher_id: CFStringRef,
586        track_id: CFStringRef,
587        track: *const c_void,
588    ) {
589        let room = unsafe { Weak::from_raw(room as *mut Room) };
590        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
591        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
592        let track = RemoteVideoTrack::new(track, track_id, publisher_id);
593        if let Some(room) = room.upgrade() {
594            room.did_subscribe_to_remote_video_track(track);
595        }
596        let _ = Weak::into_raw(room);
597    }
598
599    extern "C" fn on_did_unsubscribe_from_remote_video_track(
600        room: *mut c_void,
601        publisher_id: CFStringRef,
602        track_id: CFStringRef,
603    ) {
604        let room = unsafe { Weak::from_raw(room as *mut Room) };
605        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
606        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
607        if let Some(room) = room.upgrade() {
608            room.did_unsubscribe_from_remote_video_track(publisher_id, track_id);
609        }
610        let _ = Weak::into_raw(room);
611    }
612}
613
614impl Drop for RoomDelegate {
615    fn drop(&mut self) {
616        unsafe {
617            CFRelease(self.native_delegate);
618            let _ = Weak::from_raw(self.weak_room);
619        }
620    }
621}
622
623pub struct LocalAudioTrack(*const c_void);
624unsafe impl Send for LocalAudioTrack {}
625// todo!(Sync is not ok here. We need to remove it)
626unsafe impl Sync for LocalAudioTrack {}
627
628impl LocalAudioTrack {
629    pub fn create() -> Self {
630        Self(unsafe { LKLocalAudioTrackCreateTrack() })
631    }
632}
633
634impl Drop for LocalAudioTrack {
635    fn drop(&mut self) {
636        unsafe { CFRelease(self.0) }
637    }
638}
639
640pub struct LocalVideoTrack(*const c_void);
641unsafe impl Send for LocalVideoTrack {}
642// todo!(Sync is not ok here. We need to remove it)
643unsafe impl Sync for LocalVideoTrack {}
644
645impl LocalVideoTrack {
646    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
647        Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) })
648    }
649}
650
651impl Drop for LocalVideoTrack {
652    fn drop(&mut self) {
653        unsafe { CFRelease(self.0) }
654    }
655}
656
657pub struct LocalTrackPublication(*const c_void);
658unsafe impl Send for LocalTrackPublication {}
659// todo!(Sync is not ok here. We need to remove it)
660unsafe impl Sync for LocalTrackPublication {}
661
662impl LocalTrackPublication {
663    pub fn new(native_track_publication: *const c_void) -> Self {
664        unsafe {
665            CFRetain(native_track_publication);
666        }
667        Self(native_track_publication)
668    }
669
670    pub fn set_mute(&self, muted: bool) -> impl Future<Output = Result<()>> {
671        let (tx, rx) = futures::channel::oneshot::channel();
672
673        extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) {
674            let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender<Result<()>>) };
675            if error.is_null() {
676                tx.send(Ok(())).ok();
677            } else {
678                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
679                tx.send(Err(anyhow!(error))).ok();
680            }
681        }
682
683        unsafe {
684            LKLocalTrackPublicationSetMute(
685                self.0,
686                muted,
687                complete_callback,
688                Box::into_raw(Box::new(tx)) as *mut c_void,
689            )
690        }
691
692        async move { rx.await.unwrap() }
693    }
694}
695
696impl Drop for LocalTrackPublication {
697    fn drop(&mut self) {
698        unsafe { CFRelease(self.0) }
699    }
700}
701
702pub struct RemoteTrackPublication(*const c_void);
703
704unsafe impl Send for RemoteTrackPublication {}
705// todo!(Sync is not ok here. We need to remove it)
706unsafe impl Sync for RemoteTrackPublication {}
707
708impl RemoteTrackPublication {
709    pub fn new(native_track_publication: *const c_void) -> Self {
710        unsafe {
711            CFRetain(native_track_publication);
712        }
713        Self(native_track_publication)
714    }
715
716    pub fn sid(&self) -> String {
717        unsafe { CFString::wrap_under_get_rule(LKRemoteTrackPublicationGetSid(self.0)).to_string() }
718    }
719
720    pub fn is_muted(&self) -> bool {
721        unsafe { LKRemoteTrackPublicationIsMuted(self.0) }
722    }
723
724    pub fn set_enabled(&self, enabled: bool) -> impl Future<Output = Result<()>> {
725        let (tx, rx) = futures::channel::oneshot::channel();
726
727        extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) {
728            let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender<Result<()>>) };
729            if error.is_null() {
730                tx.send(Ok(())).ok();
731            } else {
732                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
733                tx.send(Err(anyhow!(error))).ok();
734            }
735        }
736
737        unsafe {
738            LKRemoteTrackPublicationSetEnabled(
739                self.0,
740                enabled,
741                complete_callback,
742                Box::into_raw(Box::new(tx)) as *mut c_void,
743            )
744        }
745
746        async move { rx.await.unwrap() }
747    }
748}
749
750impl Drop for RemoteTrackPublication {
751    fn drop(&mut self) {
752        unsafe { CFRelease(self.0) }
753    }
754}
755
756#[derive(Debug)]
757pub struct RemoteAudioTrack {
758    _native_track: *const c_void,
759    sid: Sid,
760    publisher_id: String,
761}
762
763unsafe impl Send for RemoteAudioTrack {}
764// todo!(Sync is not ok here. We need to remove it)
765unsafe impl Sync for RemoteAudioTrack {}
766
767impl RemoteAudioTrack {
768    fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
769        unsafe {
770            CFRetain(native_track);
771        }
772        Self {
773            _native_track: native_track,
774            sid,
775            publisher_id,
776        }
777    }
778
779    pub fn sid(&self) -> &str {
780        &self.sid
781    }
782
783    pub fn publisher_id(&self) -> &str {
784        &self.publisher_id
785    }
786
787    pub fn enable(&self) -> impl Future<Output = Result<()>> {
788        async { Ok(()) }
789    }
790
791    pub fn disable(&self) -> impl Future<Output = Result<()>> {
792        async { Ok(()) }
793    }
794}
795
796#[derive(Debug)]
797pub struct RemoteVideoTrack {
798    native_track: *const c_void,
799    sid: Sid,
800    publisher_id: String,
801}
802
803unsafe impl Send for RemoteVideoTrack {}
804// todo!(Sync is not ok here. We need to remove it)
805unsafe impl Sync for RemoteVideoTrack {}
806
807impl RemoteVideoTrack {
808    fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
809        unsafe {
810            CFRetain(native_track);
811        }
812        Self {
813            native_track,
814            sid,
815            publisher_id,
816        }
817    }
818
819    pub fn sid(&self) -> &str {
820        &self.sid
821    }
822
823    pub fn publisher_id(&self) -> &str {
824        &self.publisher_id
825    }
826
827    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
828        extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool {
829            unsafe {
830                let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
831                let buffer = CVImageBuffer::wrap_under_get_rule(frame);
832                let result = tx.try_broadcast(Frame(buffer));
833                let _ = Box::into_raw(tx);
834                match result {
835                    Ok(_) => true,
836                    Err(async_broadcast::TrySendError::Closed(_))
837                    | Err(async_broadcast::TrySendError::Inactive(_)) => {
838                        log::warn!("no active receiver for frame");
839                        false
840                    }
841                    Err(async_broadcast::TrySendError::Full(_)) => {
842                        log::warn!("skipping frame as receiver is not keeping up");
843                        true
844                    }
845                }
846            }
847        }
848
849        extern "C" fn on_drop(callback_data: *mut c_void) {
850            unsafe {
851                let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
852            }
853        }
854
855        let (tx, rx) = async_broadcast::broadcast(64);
856        unsafe {
857            let renderer = LKVideoRendererCreate(
858                Box::into_raw(Box::new(tx)) as *mut c_void,
859                on_frame,
860                on_drop,
861            );
862            LKVideoTrackAddRenderer(self.native_track, renderer);
863            rx
864        }
865    }
866}
867
868impl Drop for RemoteVideoTrack {
869    fn drop(&mut self) {
870        unsafe { CFRelease(self.native_track) }
871    }
872}
873
874pub enum RemoteVideoTrackUpdate {
875    Subscribed(Arc<RemoteVideoTrack>),
876    Unsubscribed { publisher_id: Sid, track_id: Sid },
877}
878
879pub enum RemoteAudioTrackUpdate {
880    ActiveSpeakersChanged { speakers: Vec<Sid> },
881    MuteChanged { track_id: Sid, muted: bool },
882    Subscribed(Arc<RemoteAudioTrack>, Arc<RemoteTrackPublication>),
883    Unsubscribed { publisher_id: Sid, track_id: Sid },
884}
885
886pub struct MacOSDisplay(*const c_void);
887
888unsafe impl Send for MacOSDisplay {}
889// todo!(Sync is not ok here. We need to remove it)
890unsafe impl Sync for MacOSDisplay {}
891
892impl MacOSDisplay {
893    fn new(ptr: *const c_void) -> Self {
894        unsafe {
895            CFRetain(ptr);
896        }
897        Self(ptr)
898    }
899}
900
901impl Drop for MacOSDisplay {
902    fn drop(&mut self) {
903        unsafe { CFRelease(self.0) }
904    }
905}
906
907#[derive(Clone)]
908pub struct Frame(CVImageBuffer);
909
910impl Frame {
911    pub fn width(&self) -> usize {
912        self.0.width()
913    }
914
915    pub fn height(&self) -> usize {
916        self.0.height()
917    }
918
919    pub fn image(&self) -> CVImageBuffer {
920        self.0.clone()
921    }
922}