prod.rs

  1use anyhow::{anyhow, Context, Result};
  2use core_foundation::{
  3    array::{CFArray, CFArrayRef},
  4    base::{CFRelease, CFRetain, TCFType},
  5    string::{CFString, CFStringRef},
  6};
  7use futures::{
  8    channel::{mpsc, oneshot},
  9    Future,
 10};
 11pub use media::core_video::CVImageBuffer;
 12use media::core_video::CVImageBufferRef;
 13use parking_lot::Mutex;
 14use postage::watch;
 15use std::{
 16    ffi::c_void,
 17    sync::{Arc, Weak},
 18};
 19
 20extern "C" {
 21    fn LKRoomDelegateCreate(
 22        callback_data: *mut c_void,
 23        on_did_disconnect: extern "C" fn(callback_data: *mut c_void),
 24        on_did_subscribe_to_remote_audio_track: extern "C" fn(
 25            callback_data: *mut c_void,
 26            publisher_id: CFStringRef,
 27            track_id: CFStringRef,
 28            remote_track: *const c_void,
 29            remote_publication: *const c_void,
 30        ),
 31        on_did_unsubscribe_from_remote_audio_track: extern "C" fn(
 32            callback_data: *mut c_void,
 33            publisher_id: CFStringRef,
 34            track_id: CFStringRef,
 35        ),
 36        on_mute_changed_from_remote_audio_track: extern "C" fn(
 37            callback_data: *mut c_void,
 38            track_id: CFStringRef,
 39            muted: bool,
 40        ),
 41        on_active_speakers_changed: extern "C" fn(
 42            callback_data: *mut c_void,
 43            participants: CFArrayRef,
 44        ),
 45        on_did_subscribe_to_remote_video_track: extern "C" fn(
 46            callback_data: *mut c_void,
 47            publisher_id: CFStringRef,
 48            track_id: CFStringRef,
 49            remote_track: *const c_void,
 50        ),
 51        on_did_unsubscribe_from_remote_video_track: extern "C" fn(
 52            callback_data: *mut c_void,
 53            publisher_id: CFStringRef,
 54            track_id: CFStringRef,
 55        ),
 56    ) -> *const c_void;
 57
 58    fn LKRoomCreate(delegate: *const c_void) -> *const c_void;
 59    fn LKRoomConnect(
 60        room: *const c_void,
 61        url: CFStringRef,
 62        token: CFStringRef,
 63        callback: extern "C" fn(*mut c_void, CFStringRef),
 64        callback_data: *mut c_void,
 65    );
 66    fn LKRoomDisconnect(room: *const c_void);
 67    fn LKRoomPublishVideoTrack(
 68        room: *const c_void,
 69        track: *const c_void,
 70        callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
 71        callback_data: *mut c_void,
 72    );
 73    fn LKRoomPublishAudioTrack(
 74        room: *const c_void,
 75        track: *const c_void,
 76        callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
 77        callback_data: *mut c_void,
 78    );
 79    fn LKRoomUnpublishTrack(room: *const c_void, publication: *const c_void);
 80    fn LKRoomAudioTracksForRemoteParticipant(
 81        room: *const c_void,
 82        participant_id: CFStringRef,
 83    ) -> CFArrayRef;
 84
 85    fn LKRoomAudioTrackPublicationsForRemoteParticipant(
 86        room: *const c_void,
 87        participant_id: CFStringRef,
 88    ) -> CFArrayRef;
 89
 90    fn LKRoomVideoTracksForRemoteParticipant(
 91        room: *const c_void,
 92        participant_id: CFStringRef,
 93    ) -> CFArrayRef;
 94
 95    fn LKVideoRendererCreate(
 96        callback_data: *mut c_void,
 97        on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool,
 98        on_drop: extern "C" fn(callback_data: *mut c_void),
 99    ) -> *const c_void;
100
101    fn LKRemoteAudioTrackGetSid(track: *const c_void) -> CFStringRef;
102    fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
103    fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef;
104
105    fn LKDisplaySources(
106        callback_data: *mut c_void,
107        callback: extern "C" fn(
108            callback_data: *mut c_void,
109            sources: CFArrayRef,
110            error: CFStringRef,
111        ),
112    );
113    fn LKCreateScreenShareTrackForDisplay(display: *const c_void) -> *const c_void;
114    fn LKLocalAudioTrackCreateTrack() -> *const c_void;
115
116    fn LKLocalTrackPublicationSetMute(
117        publication: *const c_void,
118        muted: bool,
119        on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef),
120        callback_data: *mut c_void,
121    );
122
123    fn LKRemoteTrackPublicationSetEnabled(
124        publication: *const c_void,
125        enabled: bool,
126        on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef),
127        callback_data: *mut c_void,
128    );
129
130    fn LKRemoteTrackPublicationIsMuted(publication: *const c_void) -> bool;
131    fn LKRemoteTrackPublicationGetSid(publication: *const c_void) -> CFStringRef;
132}
133
134pub type Sid = String;
135
136#[derive(Clone, Eq, PartialEq)]
137pub enum ConnectionState {
138    Disconnected,
139    Connected { url: String, token: String },
140}
141
142pub struct Room {
143    native_room: *const c_void,
144    connection: Mutex<(
145        watch::Sender<ConnectionState>,
146        watch::Receiver<ConnectionState>,
147    )>,
148    remote_audio_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteAudioTrackUpdate>>>,
149    remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
150    _delegate: RoomDelegate,
151}
152
153// SAFETY: LiveKit objects are thread-safe: https://github.com/livekit/client-sdk-swift#thread-safety
154unsafe impl Send for Room {}
155unsafe impl Sync for Room {}
156
157impl Room {
158    pub fn new() -> Arc<Self> {
159        Arc::new_cyclic(|weak_room| {
160            let delegate = RoomDelegate::new(weak_room.clone());
161            Self {
162                native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
163                connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)),
164                remote_audio_track_subscribers: Default::default(),
165                remote_video_track_subscribers: Default::default(),
166                _delegate: delegate,
167            }
168        })
169    }
170
171    pub fn status(&self) -> watch::Receiver<ConnectionState> {
172        self.connection.lock().1.clone()
173    }
174
175    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
176        let url = CFString::new(url);
177        let token = CFString::new(token);
178        let (did_connect, tx, rx) = Self::build_done_callback();
179        unsafe {
180            LKRoomConnect(
181                self.native_room,
182                url.as_concrete_TypeRef(),
183                token.as_concrete_TypeRef(),
184                did_connect,
185                tx,
186            )
187        }
188
189        let this = self.clone();
190        let url = url.to_string();
191        let token = token.to_string();
192        async move {
193            rx.await.unwrap().context("error connecting to room")?;
194            *this.connection.lock().0.borrow_mut() = ConnectionState::Connected { url, token };
195            Ok(())
196        }
197    }
198
199    fn did_disconnect(&self) {
200        *self.connection.lock().0.borrow_mut() = ConnectionState::Disconnected;
201    }
202
203    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
204        extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
205            unsafe {
206                let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
207
208                if sources.is_null() {
209                    let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
210                } else {
211                    let sources = CFArray::wrap_under_get_rule(sources)
212                        .into_iter()
213                        .map(|source| MacOSDisplay::new(*source))
214                        .collect();
215
216                    let _ = tx.send(Ok(sources));
217                }
218            }
219        }
220
221        let (tx, rx) = oneshot::channel();
222
223        unsafe {
224            LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
225        }
226
227        async move { rx.await.unwrap() }
228    }
229
230    pub fn publish_video_track(
231        self: &Arc<Self>,
232        track: LocalVideoTrack,
233    ) -> impl Future<Output = Result<LocalTrackPublication>> {
234        let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
235        extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
236            let tx =
237                unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
238            if error.is_null() {
239                let _ = tx.send(Ok(LocalTrackPublication::new(publication)));
240            } else {
241                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
242                let _ = tx.send(Err(anyhow!(error)));
243            }
244        }
245        unsafe {
246            LKRoomPublishVideoTrack(
247                self.native_room,
248                track.0,
249                callback,
250                Box::into_raw(Box::new(tx)) as *mut c_void,
251            );
252        }
253        async { rx.await.unwrap().context("error publishing video track") }
254    }
255
256    pub fn publish_audio_track(
257        self: &Arc<Self>,
258        track: LocalAudioTrack,
259    ) -> impl Future<Output = Result<LocalTrackPublication>> {
260        let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
261        extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
262            let tx =
263                unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
264            if error.is_null() {
265                let _ = tx.send(Ok(LocalTrackPublication::new(publication)));
266            } else {
267                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
268                let _ = tx.send(Err(anyhow!(error)));
269            }
270        }
271        unsafe {
272            LKRoomPublishAudioTrack(
273                self.native_room,
274                track.0,
275                callback,
276                Box::into_raw(Box::new(tx)) as *mut c_void,
277            );
278        }
279        async { rx.await.unwrap().context("error publishing audio track") }
280    }
281
282    pub fn unpublish_track(&self, publication: LocalTrackPublication) {
283        unsafe {
284            LKRoomUnpublishTrack(self.native_room, publication.0);
285        }
286    }
287
288    pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
289        unsafe {
290            let tracks = LKRoomVideoTracksForRemoteParticipant(
291                self.native_room,
292                CFString::new(participant_id).as_concrete_TypeRef(),
293            );
294
295            if tracks.is_null() {
296                Vec::new()
297            } else {
298                let tracks = CFArray::wrap_under_get_rule(tracks);
299                tracks
300                    .into_iter()
301                    .map(|native_track| {
302                        let native_track = *native_track;
303                        let id =
304                            CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
305                                .to_string();
306                        Arc::new(RemoteVideoTrack::new(
307                            native_track,
308                            id,
309                            participant_id.into(),
310                        ))
311                    })
312                    .collect()
313            }
314        }
315    }
316
317    pub fn remote_audio_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
318        unsafe {
319            let tracks = LKRoomAudioTracksForRemoteParticipant(
320                self.native_room,
321                CFString::new(participant_id).as_concrete_TypeRef(),
322            );
323
324            if tracks.is_null() {
325                Vec::new()
326            } else {
327                let tracks = CFArray::wrap_under_get_rule(tracks);
328                tracks
329                    .into_iter()
330                    .map(|native_track| {
331                        let native_track = *native_track;
332                        let id =
333                            CFString::wrap_under_get_rule(LKRemoteAudioTrackGetSid(native_track))
334                                .to_string();
335                        Arc::new(RemoteAudioTrack::new(
336                            native_track,
337                            id,
338                            participant_id.into(),
339                        ))
340                    })
341                    .collect()
342            }
343        }
344    }
345
346    pub fn remote_audio_track_publications(
347        &self,
348        participant_id: &str,
349    ) -> Vec<Arc<RemoteTrackPublication>> {
350        unsafe {
351            let tracks = LKRoomAudioTrackPublicationsForRemoteParticipant(
352                self.native_room,
353                CFString::new(participant_id).as_concrete_TypeRef(),
354            );
355
356            if tracks.is_null() {
357                Vec::new()
358            } else {
359                let tracks = CFArray::wrap_under_get_rule(tracks);
360                tracks
361                    .into_iter()
362                    .map(|native_track_publication| {
363                        let native_track_publication = *native_track_publication;
364                        Arc::new(RemoteTrackPublication::new(native_track_publication))
365                    })
366                    .collect()
367            }
368        }
369    }
370
371    pub fn remote_audio_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteAudioTrackUpdate> {
372        let (tx, rx) = mpsc::unbounded();
373        self.remote_audio_track_subscribers.lock().push(tx);
374        rx
375    }
376
377    pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
378        let (tx, rx) = mpsc::unbounded();
379        self.remote_video_track_subscribers.lock().push(tx);
380        rx
381    }
382
383    fn did_subscribe_to_remote_audio_track(
384        &self,
385        track: RemoteAudioTrack,
386        publication: RemoteTrackPublication,
387    ) {
388        let track = Arc::new(track);
389        let publication = Arc::new(publication);
390        self.remote_audio_track_subscribers.lock().retain(|tx| {
391            tx.unbounded_send(RemoteAudioTrackUpdate::Subscribed(
392                track.clone(),
393                publication.clone(),
394            ))
395            .is_ok()
396        });
397    }
398
399    fn did_unsubscribe_from_remote_audio_track(&self, publisher_id: String, track_id: String) {
400        self.remote_audio_track_subscribers.lock().retain(|tx| {
401            tx.unbounded_send(RemoteAudioTrackUpdate::Unsubscribed {
402                publisher_id: publisher_id.clone(),
403                track_id: track_id.clone(),
404            })
405            .is_ok()
406        });
407    }
408
409    fn mute_changed_from_remote_audio_track(&self, track_id: String, muted: bool) {
410        self.remote_audio_track_subscribers.lock().retain(|tx| {
411            tx.unbounded_send(RemoteAudioTrackUpdate::MuteChanged {
412                track_id: track_id.clone(),
413                muted,
414            })
415            .is_ok()
416        });
417    }
418
419    // A vec of publisher IDs
420    fn active_speakers_changed(&self, speakers: Vec<String>) {
421        self.remote_audio_track_subscribers
422            .lock()
423            .retain(move |tx| {
424                tx.unbounded_send(RemoteAudioTrackUpdate::ActiveSpeakersChanged {
425                    speakers: speakers.clone(),
426                })
427                .is_ok()
428            });
429    }
430
431    fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
432        let track = Arc::new(track);
433        self.remote_video_track_subscribers.lock().retain(|tx| {
434            tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
435                .is_ok()
436        });
437    }
438
439    fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
440        self.remote_video_track_subscribers.lock().retain(|tx| {
441            tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
442                publisher_id: publisher_id.clone(),
443                track_id: track_id.clone(),
444            })
445            .is_ok()
446        });
447    }
448
449    fn build_done_callback() -> (
450        extern "C" fn(*mut c_void, CFStringRef),
451        *mut c_void,
452        oneshot::Receiver<Result<()>>,
453    ) {
454        let (tx, rx) = oneshot::channel();
455        extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
456            let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
457            if error.is_null() {
458                let _ = tx.send(Ok(()));
459            } else {
460                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
461                let _ = tx.send(Err(anyhow!(error)));
462            }
463        }
464        (
465            done_callback,
466            Box::into_raw(Box::new(tx)) as *mut c_void,
467            rx,
468        )
469    }
470}
471
472impl Drop for Room {
473    fn drop(&mut self) {
474        unsafe {
475            LKRoomDisconnect(self.native_room);
476            CFRelease(self.native_room);
477        }
478    }
479}
480
481struct RoomDelegate {
482    native_delegate: *const c_void,
483    weak_room: *const Room,
484}
485
486impl RoomDelegate {
487    fn new(weak_room: Weak<Room>) -> Self {
488        let weak_room = Weak::into_raw(weak_room);
489        let native_delegate = unsafe {
490            LKRoomDelegateCreate(
491                weak_room as *mut c_void,
492                Self::on_did_disconnect,
493                Self::on_did_subscribe_to_remote_audio_track,
494                Self::on_did_unsubscribe_from_remote_audio_track,
495                Self::on_mute_change_from_remote_audio_track,
496                Self::on_active_speakers_changed,
497                Self::on_did_subscribe_to_remote_video_track,
498                Self::on_did_unsubscribe_from_remote_video_track,
499            )
500        };
501        Self {
502            native_delegate,
503            weak_room,
504        }
505    }
506
507    extern "C" fn on_did_disconnect(room: *mut c_void) {
508        let room = unsafe { Weak::from_raw(room as *mut Room) };
509        if let Some(room) = room.upgrade() {
510            room.did_disconnect();
511        }
512        let _ = Weak::into_raw(room);
513    }
514
515    extern "C" fn on_did_subscribe_to_remote_audio_track(
516        room: *mut c_void,
517        publisher_id: CFStringRef,
518        track_id: CFStringRef,
519        track: *const c_void,
520        publication: *const c_void,
521    ) {
522        let room = unsafe { Weak::from_raw(room as *mut Room) };
523        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
524        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
525        let track = RemoteAudioTrack::new(track, track_id, publisher_id);
526        let publication = RemoteTrackPublication::new(publication);
527        if let Some(room) = room.upgrade() {
528            room.did_subscribe_to_remote_audio_track(track, publication);
529        }
530        let _ = Weak::into_raw(room);
531    }
532
533    extern "C" fn on_did_unsubscribe_from_remote_audio_track(
534        room: *mut c_void,
535        publisher_id: CFStringRef,
536        track_id: CFStringRef,
537    ) {
538        let room = unsafe { Weak::from_raw(room as *mut Room) };
539        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
540        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
541        if let Some(room) = room.upgrade() {
542            room.did_unsubscribe_from_remote_audio_track(publisher_id, track_id);
543        }
544        let _ = Weak::into_raw(room);
545    }
546
547    extern "C" fn on_mute_change_from_remote_audio_track(
548        room: *mut c_void,
549        track_id: CFStringRef,
550        muted: bool,
551    ) {
552        let room = unsafe { Weak::from_raw(room as *mut Room) };
553        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
554        if let Some(room) = room.upgrade() {
555            room.mute_changed_from_remote_audio_track(track_id, muted);
556        }
557        let _ = Weak::into_raw(room);
558    }
559
560    extern "C" fn on_active_speakers_changed(room: *mut c_void, participants: CFArrayRef) {
561        if participants.is_null() {
562            return;
563        }
564
565        let room = unsafe { Weak::from_raw(room as *mut Room) };
566        let speakers = unsafe {
567            CFArray::wrap_under_get_rule(participants)
568                .into_iter()
569                .map(
570                    |speaker: core_foundation::base::ItemRef<'_, *const c_void>| {
571                        CFString::wrap_under_get_rule(*speaker as CFStringRef).to_string()
572                    },
573                )
574                .collect()
575        };
576
577        if let Some(room) = room.upgrade() {
578            room.active_speakers_changed(speakers);
579        }
580        let _ = Weak::into_raw(room);
581    }
582
583    extern "C" fn on_did_subscribe_to_remote_video_track(
584        room: *mut c_void,
585        publisher_id: CFStringRef,
586        track_id: CFStringRef,
587        track: *const c_void,
588    ) {
589        let room = unsafe { Weak::from_raw(room as *mut Room) };
590        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
591        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
592        let track = RemoteVideoTrack::new(track, track_id, publisher_id);
593        if let Some(room) = room.upgrade() {
594            room.did_subscribe_to_remote_video_track(track);
595        }
596        let _ = Weak::into_raw(room);
597    }
598
599    extern "C" fn on_did_unsubscribe_from_remote_video_track(
600        room: *mut c_void,
601        publisher_id: CFStringRef,
602        track_id: CFStringRef,
603    ) {
604        let room = unsafe { Weak::from_raw(room as *mut Room) };
605        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
606        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
607        if let Some(room) = room.upgrade() {
608            room.did_unsubscribe_from_remote_video_track(publisher_id, track_id);
609        }
610        let _ = Weak::into_raw(room);
611    }
612}
613
614impl Drop for RoomDelegate {
615    fn drop(&mut self) {
616        unsafe {
617            CFRelease(self.native_delegate);
618            let _ = Weak::from_raw(self.weak_room);
619        }
620    }
621}
622
623pub struct LocalAudioTrack(*const c_void);
624unsafe impl Send for LocalAudioTrack {}
625
626impl LocalAudioTrack {
627    pub fn create() -> Self {
628        Self(unsafe { LKLocalAudioTrackCreateTrack() })
629    }
630}
631
632impl Drop for LocalAudioTrack {
633    fn drop(&mut self) {
634        unsafe { CFRelease(self.0) }
635    }
636}
637
638pub struct LocalVideoTrack(*const c_void);
639unsafe impl Send for LocalVideoTrack {}
640
641impl LocalVideoTrack {
642    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
643        Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) })
644    }
645}
646
647impl Drop for LocalVideoTrack {
648    fn drop(&mut self) {
649        unsafe { CFRelease(self.0) }
650    }
651}
652
653pub struct LocalTrackPublication(*const c_void);
654unsafe impl Send for LocalTrackPublication {}
655
656impl LocalTrackPublication {
657    pub fn new(native_track_publication: *const c_void) -> Self {
658        unsafe {
659            CFRetain(native_track_publication);
660        }
661        Self(native_track_publication)
662    }
663
664    pub fn set_mute(&self, muted: bool) -> impl Future<Output = Result<()>> {
665        let (tx, rx) = futures::channel::oneshot::channel();
666
667        extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) {
668            let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender<Result<()>>) };
669            if error.is_null() {
670                tx.send(Ok(())).ok();
671            } else {
672                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
673                tx.send(Err(anyhow!(error))).ok();
674            }
675        }
676
677        unsafe {
678            LKLocalTrackPublicationSetMute(
679                self.0,
680                muted,
681                complete_callback,
682                Box::into_raw(Box::new(tx)) as *mut c_void,
683            )
684        }
685
686        async move { rx.await.unwrap() }
687    }
688}
689
690impl Drop for LocalTrackPublication {
691    fn drop(&mut self) {
692        unsafe { CFRelease(self.0) }
693    }
694}
695
696pub struct RemoteTrackPublication(*const c_void);
697
698unsafe impl Send for RemoteTrackPublication {}
699
700impl RemoteTrackPublication {
701    pub fn new(native_track_publication: *const c_void) -> Self {
702        unsafe {
703            CFRetain(native_track_publication);
704        }
705        Self(native_track_publication)
706    }
707
708    pub fn sid(&self) -> String {
709        unsafe { CFString::wrap_under_get_rule(LKRemoteTrackPublicationGetSid(self.0)).to_string() }
710    }
711
712    pub fn is_muted(&self) -> bool {
713        unsafe { LKRemoteTrackPublicationIsMuted(self.0) }
714    }
715
716    pub fn set_enabled(&self, enabled: bool) -> impl Future<Output = Result<()>> {
717        let (tx, rx) = futures::channel::oneshot::channel();
718
719        extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) {
720            let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender<Result<()>>) };
721            if error.is_null() {
722                tx.send(Ok(())).ok();
723            } else {
724                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
725                tx.send(Err(anyhow!(error))).ok();
726            }
727        }
728
729        unsafe {
730            LKRemoteTrackPublicationSetEnabled(
731                self.0,
732                enabled,
733                complete_callback,
734                Box::into_raw(Box::new(tx)) as *mut c_void,
735            )
736        }
737
738        async move { rx.await.unwrap() }
739    }
740}
741
742impl Drop for RemoteTrackPublication {
743    fn drop(&mut self) {
744        unsafe { CFRelease(self.0) }
745    }
746}
747
748#[derive(Debug)]
749pub struct RemoteAudioTrack {
750    _native_track: *const c_void,
751    sid: Sid,
752    publisher_id: String,
753}
754
755unsafe impl Send for RemoteAudioTrack {}
756
757impl RemoteAudioTrack {
758    fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
759        unsafe {
760            CFRetain(native_track);
761        }
762        Self {
763            _native_track: native_track,
764            sid,
765            publisher_id,
766        }
767    }
768
769    pub fn sid(&self) -> &str {
770        &self.sid
771    }
772
773    pub fn publisher_id(&self) -> &str {
774        &self.publisher_id
775    }
776
777    pub fn enable(&self) -> impl Future<Output = Result<()>> {
778        async { Ok(()) }
779    }
780
781    pub fn disable(&self) -> impl Future<Output = Result<()>> {
782        async { Ok(()) }
783    }
784}
785
786#[derive(Debug)]
787pub struct RemoteVideoTrack {
788    native_track: *const c_void,
789    sid: Sid,
790    publisher_id: String,
791}
792
793unsafe impl Send for RemoteVideoTrack {}
794
795impl RemoteVideoTrack {
796    fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
797        unsafe {
798            CFRetain(native_track);
799        }
800        Self {
801            native_track,
802            sid,
803            publisher_id,
804        }
805    }
806
807    pub fn sid(&self) -> &str {
808        &self.sid
809    }
810
811    pub fn publisher_id(&self) -> &str {
812        &self.publisher_id
813    }
814
815    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
816        extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool {
817            unsafe {
818                let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
819                let buffer = CVImageBuffer::wrap_under_get_rule(frame);
820                let result = tx.try_broadcast(Frame(buffer));
821                let _ = Box::into_raw(tx);
822                match result {
823                    Ok(_) => true,
824                    Err(async_broadcast::TrySendError::Closed(_))
825                    | Err(async_broadcast::TrySendError::Inactive(_)) => {
826                        log::warn!("no active receiver for frame");
827                        false
828                    }
829                    Err(async_broadcast::TrySendError::Full(_)) => {
830                        log::warn!("skipping frame as receiver is not keeping up");
831                        true
832                    }
833                }
834            }
835        }
836
837        extern "C" fn on_drop(callback_data: *mut c_void) {
838            unsafe {
839                let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
840            }
841        }
842
843        let (tx, rx) = async_broadcast::broadcast(64);
844        unsafe {
845            let renderer = LKVideoRendererCreate(
846                Box::into_raw(Box::new(tx)) as *mut c_void,
847                on_frame,
848                on_drop,
849            );
850            LKVideoTrackAddRenderer(self.native_track, renderer);
851            rx
852        }
853    }
854}
855
856impl Drop for RemoteVideoTrack {
857    fn drop(&mut self) {
858        unsafe { CFRelease(self.native_track) }
859    }
860}
861
862pub enum RemoteVideoTrackUpdate {
863    Subscribed(Arc<RemoteVideoTrack>),
864    Unsubscribed { publisher_id: Sid, track_id: Sid },
865}
866
867pub enum RemoteAudioTrackUpdate {
868    ActiveSpeakersChanged { speakers: Vec<Sid> },
869    MuteChanged { track_id: Sid, muted: bool },
870    Subscribed(Arc<RemoteAudioTrack>, Arc<RemoteTrackPublication>),
871    Unsubscribed { publisher_id: Sid, track_id: Sid },
872}
873
874pub struct MacOSDisplay(*const c_void);
875
876unsafe impl Send for MacOSDisplay {}
877
878impl MacOSDisplay {
879    fn new(ptr: *const c_void) -> Self {
880        unsafe {
881            CFRetain(ptr);
882        }
883        Self(ptr)
884    }
885}
886
887impl Drop for MacOSDisplay {
888    fn drop(&mut self) {
889        unsafe { CFRelease(self.0) }
890    }
891}
892
893#[derive(Clone)]
894pub struct Frame(CVImageBuffer);
895
896impl Frame {
897    pub fn width(&self) -> usize {
898        self.0.width()
899    }
900
901    pub fn height(&self) -> usize {
902        self.0.height()
903    }
904
905    pub fn image(&self) -> CVImageBuffer {
906        self.0.clone()
907    }
908}