prod.rs

  1use anyhow::{anyhow, Context, Result};
  2use core_foundation::{
  3    array::{CFArray, CFArrayRef},
  4    base::{CFRelease, CFRetain, TCFType},
  5    string::{CFString, CFStringRef},
  6};
  7use futures::{
  8    channel::{mpsc, oneshot},
  9    Future,
 10};
 11pub use media::core_video::CVImageBuffer;
 12use media::core_video::CVImageBufferRef;
 13use parking_lot::Mutex;
 14use postage::watch;
 15use std::{
 16    ffi::c_void,
 17    sync::{Arc, Weak},
 18};
 19
 20extern "C" {
 21    fn LKRoomDelegateCreate(
 22        callback_data: *mut c_void,
 23        on_did_disconnect: extern "C" fn(callback_data: *mut c_void),
 24        on_did_subscribe_to_remote_audio_track: extern "C" fn(
 25            callback_data: *mut c_void,
 26            publisher_id: CFStringRef,
 27            track_id: CFStringRef,
 28            remote_track: *const c_void,
 29        ),
 30        on_did_unsubscribe_from_remote_audio_track: extern "C" fn(
 31            callback_data: *mut c_void,
 32            publisher_id: CFStringRef,
 33            track_id: CFStringRef,
 34        ),
 35        on_mute_changed_from_remote_audio_track: extern "C" fn(
 36            callback_data: *mut c_void,
 37            track_id: CFStringRef,
 38            muted: bool,
 39        ),
 40        on_active_speakers_changed: extern "C" fn(
 41            callback_data: *mut c_void,
 42            participants: CFArrayRef,
 43        ),
 44        on_did_subscribe_to_remote_video_track: extern "C" fn(
 45            callback_data: *mut c_void,
 46            publisher_id: CFStringRef,
 47            track_id: CFStringRef,
 48            remote_track: *const c_void,
 49        ),
 50        on_did_unsubscribe_from_remote_video_track: extern "C" fn(
 51            callback_data: *mut c_void,
 52            publisher_id: CFStringRef,
 53            track_id: CFStringRef,
 54        ),
 55    ) -> *const c_void;
 56
 57    fn LKRoomCreate(delegate: *const c_void) -> *const c_void;
 58    fn LKRoomConnect(
 59        room: *const c_void,
 60        url: CFStringRef,
 61        token: CFStringRef,
 62        callback: extern "C" fn(*mut c_void, CFStringRef),
 63        callback_data: *mut c_void,
 64    );
 65    fn LKRoomDisconnect(room: *const c_void);
 66    fn LKRoomPublishVideoTrack(
 67        room: *const c_void,
 68        track: *const c_void,
 69        callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
 70        callback_data: *mut c_void,
 71    );
 72    fn LKRoomPublishAudioTrack(
 73        room: *const c_void,
 74        track: *const c_void,
 75        callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
 76        callback_data: *mut c_void,
 77    );
 78    fn LKRoomUnpublishTrack(room: *const c_void, publication: *const c_void);
 79    fn LKRoomAudioTracksForRemoteParticipant(
 80        room: *const c_void,
 81        participant_id: CFStringRef,
 82    ) -> CFArrayRef;
 83
 84    fn LKRoomAudioTrackPublicationsForRemoteParticipant(
 85        room: *const c_void,
 86        participant_id: CFStringRef,
 87    ) -> CFArrayRef;
 88
 89    fn LKRoomVideoTracksForRemoteParticipant(
 90        room: *const c_void,
 91        participant_id: CFStringRef,
 92    ) -> CFArrayRef;
 93
 94    fn LKVideoRendererCreate(
 95        callback_data: *mut c_void,
 96        on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool,
 97        on_drop: extern "C" fn(callback_data: *mut c_void),
 98    ) -> *const c_void;
 99
100    fn LKRemoteAudioTrackGetSid(track: *const c_void) -> CFStringRef;
101    fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
102    fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef;
103
104    fn LKDisplaySources(
105        callback_data: *mut c_void,
106        callback: extern "C" fn(
107            callback_data: *mut c_void,
108            sources: CFArrayRef,
109            error: CFStringRef,
110        ),
111    );
112    fn LKCreateScreenShareTrackForDisplay(display: *const c_void) -> *const c_void;
113    fn LKLocalAudioTrackCreateTrack() -> *const c_void;
114
115    fn LKLocalTrackPublicationSetMute(
116        publication: *const c_void,
117        muted: bool,
118        on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef),
119        callback_data: *mut c_void,
120    );
121
122    fn LKRemoteTrackPublicationSetEnabled(
123        publication: *const c_void,
124        enabled: bool,
125        on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef),
126        callback_data: *mut c_void,
127    );
128}
129
130pub type Sid = String;
131
132#[derive(Clone, Eq, PartialEq)]
133pub enum ConnectionState {
134    Disconnected,
135    Connected { url: String, token: String },
136}
137
138pub struct Room {
139    native_room: *const c_void,
140    connection: Mutex<(
141        watch::Sender<ConnectionState>,
142        watch::Receiver<ConnectionState>,
143    )>,
144    remote_audio_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteAudioTrackUpdate>>>,
145    remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
146    _delegate: RoomDelegate,
147}
148
149impl Room {
150    pub fn new() -> Arc<Self> {
151        Arc::new_cyclic(|weak_room| {
152            let delegate = RoomDelegate::new(weak_room.clone());
153            Self {
154                native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
155                connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)),
156                remote_audio_track_subscribers: Default::default(),
157                remote_video_track_subscribers: Default::default(),
158                _delegate: delegate,
159            }
160        })
161    }
162
163    pub fn status(&self) -> watch::Receiver<ConnectionState> {
164        self.connection.lock().1.clone()
165    }
166
167    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
168        let url = CFString::new(url);
169        let token = CFString::new(token);
170        let (did_connect, tx, rx) = Self::build_done_callback();
171        unsafe {
172            LKRoomConnect(
173                self.native_room,
174                url.as_concrete_TypeRef(),
175                token.as_concrete_TypeRef(),
176                did_connect,
177                tx,
178            )
179        }
180
181        let this = self.clone();
182        let url = url.to_string();
183        let token = token.to_string();
184        async move {
185            rx.await.unwrap().context("error connecting to room")?;
186            *this.connection.lock().0.borrow_mut() = ConnectionState::Connected { url, token };
187            Ok(())
188        }
189    }
190
191    fn did_disconnect(&self) {
192        *self.connection.lock().0.borrow_mut() = ConnectionState::Disconnected;
193    }
194
195    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
196        extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
197            unsafe {
198                let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
199
200                if sources.is_null() {
201                    let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
202                } else {
203                    let sources = CFArray::wrap_under_get_rule(sources)
204                        .into_iter()
205                        .map(|source| MacOSDisplay::new(*source))
206                        .collect();
207
208                    let _ = tx.send(Ok(sources));
209                }
210            }
211        }
212
213        let (tx, rx) = oneshot::channel();
214
215        unsafe {
216            LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
217        }
218
219        async move { rx.await.unwrap() }
220    }
221
222    pub fn publish_video_track(
223        self: &Arc<Self>,
224        track: &LocalVideoTrack,
225    ) -> impl Future<Output = Result<LocalTrackPublication>> {
226        let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
227        extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
228            let tx =
229                unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
230            if error.is_null() {
231                let _ = tx.send(Ok(LocalTrackPublication::new(publication)));
232            } else {
233                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
234                let _ = tx.send(Err(anyhow!(error)));
235            }
236        }
237        unsafe {
238            LKRoomPublishVideoTrack(
239                self.native_room,
240                track.0,
241                callback,
242                Box::into_raw(Box::new(tx)) as *mut c_void,
243            );
244        }
245        async { rx.await.unwrap().context("error publishing video track") }
246    }
247
248    pub fn publish_audio_track(
249        self: &Arc<Self>,
250        track: &LocalAudioTrack,
251    ) -> impl Future<Output = Result<LocalTrackPublication>> {
252        let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
253        extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
254            let tx =
255                unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
256            if error.is_null() {
257                let _ = tx.send(Ok(LocalTrackPublication::new(publication)));
258            } else {
259                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
260                let _ = tx.send(Err(anyhow!(error)));
261            }
262        }
263        unsafe {
264            LKRoomPublishAudioTrack(
265                self.native_room,
266                track.0,
267                callback,
268                Box::into_raw(Box::new(tx)) as *mut c_void,
269            );
270        }
271        async { rx.await.unwrap().context("error publishing audio track") }
272    }
273
274    pub fn unpublish_track(&self, publication: LocalTrackPublication) {
275        unsafe {
276            LKRoomUnpublishTrack(self.native_room, publication.0);
277        }
278    }
279
280    pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
281        unsafe {
282            let tracks = LKRoomVideoTracksForRemoteParticipant(
283                self.native_room,
284                CFString::new(participant_id).as_concrete_TypeRef(),
285            );
286
287            if tracks.is_null() {
288                Vec::new()
289            } else {
290                let tracks = CFArray::wrap_under_get_rule(tracks);
291                tracks
292                    .into_iter()
293                    .map(|native_track| {
294                        let native_track = *native_track;
295                        let id =
296                            CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
297                                .to_string();
298                        Arc::new(RemoteVideoTrack::new(
299                            native_track,
300                            id,
301                            participant_id.into(),
302                        ))
303                    })
304                    .collect()
305            }
306        }
307    }
308
309    pub fn remote_audio_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
310        unsafe {
311            let tracks = LKRoomAudioTracksForRemoteParticipant(
312                self.native_room,
313                CFString::new(participant_id).as_concrete_TypeRef(),
314            );
315
316            if tracks.is_null() {
317                Vec::new()
318            } else {
319                let tracks = CFArray::wrap_under_get_rule(tracks);
320                tracks
321                    .into_iter()
322                    .map(|native_track| {
323                        let native_track = *native_track;
324                        let id =
325                            CFString::wrap_under_get_rule(LKRemoteAudioTrackGetSid(native_track))
326                                .to_string();
327                        Arc::new(RemoteAudioTrack::new(
328                            native_track,
329                            id,
330                            participant_id.into(),
331                        ))
332                    })
333                    .collect()
334            }
335        }
336    }
337
338    pub fn remote_audio_track_publications(
339        &self,
340        participant_id: &str,
341    ) -> Vec<Arc<RemoteTrackPublication>> {
342        unsafe {
343            let tracks = LKRoomAudioTrackPublicationsForRemoteParticipant(
344                self.native_room,
345                CFString::new(participant_id).as_concrete_TypeRef(),
346            );
347
348            if tracks.is_null() {
349                Vec::new()
350            } else {
351                let tracks = CFArray::wrap_under_get_rule(tracks);
352                tracks
353                    .into_iter()
354                    .map(|native_track_publication| {
355                        let native_track_publication = *native_track_publication;
356                        Arc::new(RemoteTrackPublication::new(native_track_publication))
357                    })
358                    .collect()
359            }
360        }
361    }
362
363    pub fn remote_audio_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteAudioTrackUpdate> {
364        let (tx, rx) = mpsc::unbounded();
365        self.remote_audio_track_subscribers.lock().push(tx);
366        rx
367    }
368
369    pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
370        let (tx, rx) = mpsc::unbounded();
371        self.remote_video_track_subscribers.lock().push(tx);
372        rx
373    }
374
375    fn did_subscribe_to_remote_audio_track(&self, track: RemoteAudioTrack) {
376        let track = Arc::new(track);
377        self.remote_audio_track_subscribers.lock().retain(|tx| {
378            tx.unbounded_send(RemoteAudioTrackUpdate::Subscribed(track.clone()))
379                .is_ok()
380        });
381    }
382
383    fn did_unsubscribe_from_remote_audio_track(&self, publisher_id: String, track_id: String) {
384        self.remote_audio_track_subscribers.lock().retain(|tx| {
385            tx.unbounded_send(RemoteAudioTrackUpdate::Unsubscribed {
386                publisher_id: publisher_id.clone(),
387                track_id: track_id.clone(),
388            })
389            .is_ok()
390        });
391    }
392
393    fn mute_changed_from_remote_audio_track(&self, track_id: String, muted: bool) {
394        self.remote_audio_track_subscribers.lock().retain(|tx| {
395            tx.unbounded_send(RemoteAudioTrackUpdate::MuteChanged {
396                track_id: track_id.clone(),
397                muted,
398            })
399            .is_ok()
400        });
401    }
402
403    // A vec of publisher IDs
404    fn active_speakers_changed(&self, speakers: Vec<String>) {
405        self.remote_audio_track_subscribers
406            .lock()
407            .retain(move |tx| {
408                tx.unbounded_send(RemoteAudioTrackUpdate::ActiveSpeakersChanged {
409                    speakers: speakers.clone(),
410                })
411                .is_ok()
412            });
413    }
414
415    fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
416        let track = Arc::new(track);
417        self.remote_video_track_subscribers.lock().retain(|tx| {
418            tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
419                .is_ok()
420        });
421    }
422
423    fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
424        self.remote_video_track_subscribers.lock().retain(|tx| {
425            tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
426                publisher_id: publisher_id.clone(),
427                track_id: track_id.clone(),
428            })
429            .is_ok()
430        });
431    }
432
433    fn build_done_callback() -> (
434        extern "C" fn(*mut c_void, CFStringRef),
435        *mut c_void,
436        oneshot::Receiver<Result<()>>,
437    ) {
438        let (tx, rx) = oneshot::channel();
439        extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
440            let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
441            if error.is_null() {
442                let _ = tx.send(Ok(()));
443            } else {
444                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
445                let _ = tx.send(Err(anyhow!(error)));
446            }
447        }
448        (
449            done_callback,
450            Box::into_raw(Box::new(tx)) as *mut c_void,
451            rx,
452        )
453    }
454}
455
456impl Drop for Room {
457    fn drop(&mut self) {
458        unsafe {
459            LKRoomDisconnect(self.native_room);
460            CFRelease(self.native_room);
461        }
462    }
463}
464
465struct RoomDelegate {
466    native_delegate: *const c_void,
467    weak_room: *const Room,
468}
469
470impl RoomDelegate {
471    fn new(weak_room: Weak<Room>) -> Self {
472        let weak_room = Weak::into_raw(weak_room);
473        let native_delegate = unsafe {
474            LKRoomDelegateCreate(
475                weak_room as *mut c_void,
476                Self::on_did_disconnect,
477                Self::on_did_subscribe_to_remote_audio_track,
478                Self::on_did_unsubscribe_from_remote_audio_track,
479                Self::on_mute_change_from_remote_audio_track,
480                Self::on_active_speakers_changed,
481                Self::on_did_subscribe_to_remote_video_track,
482                Self::on_did_unsubscribe_from_remote_video_track,
483            )
484        };
485        Self {
486            native_delegate,
487            weak_room,
488        }
489    }
490
491    extern "C" fn on_did_disconnect(room: *mut c_void) {
492        let room = unsafe { Weak::from_raw(room as *mut Room) };
493        if let Some(room) = room.upgrade() {
494            room.did_disconnect();
495        }
496        let _ = Weak::into_raw(room);
497    }
498
499    extern "C" fn on_did_subscribe_to_remote_audio_track(
500        room: *mut c_void,
501        publisher_id: CFStringRef,
502        track_id: CFStringRef,
503        track: *const c_void,
504    ) {
505        let room = unsafe { Weak::from_raw(room as *mut Room) };
506        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
507        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
508        let track = RemoteAudioTrack::new(track, track_id, publisher_id);
509        if let Some(room) = room.upgrade() {
510            room.did_subscribe_to_remote_audio_track(track);
511        }
512        let _ = Weak::into_raw(room);
513    }
514
515    extern "C" fn on_did_unsubscribe_from_remote_audio_track(
516        room: *mut c_void,
517        publisher_id: CFStringRef,
518        track_id: CFStringRef,
519    ) {
520        let room = unsafe { Weak::from_raw(room as *mut Room) };
521        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
522        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
523        if let Some(room) = room.upgrade() {
524            room.did_unsubscribe_from_remote_audio_track(publisher_id, track_id);
525        }
526        let _ = Weak::into_raw(room);
527    }
528
529    extern "C" fn on_mute_change_from_remote_audio_track(
530        room: *mut c_void,
531        track_id: CFStringRef,
532        muted: bool,
533    ) {
534        let room = unsafe { Weak::from_raw(room as *mut Room) };
535        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
536        if let Some(room) = room.upgrade() {
537            room.mute_changed_from_remote_audio_track(track_id, muted);
538        }
539        let _ = Weak::into_raw(room);
540    }
541
542    extern "C" fn on_active_speakers_changed(room: *mut c_void, participants: CFArrayRef) {
543        if participants.is_null() {
544            return;
545        }
546
547        let room = unsafe { Weak::from_raw(room as *mut Room) };
548        let speakers = unsafe {
549            CFArray::wrap_under_get_rule(participants)
550                .into_iter()
551                .map(
552                    |speaker: core_foundation::base::ItemRef<'_, *const c_void>| {
553                        CFString::wrap_under_get_rule(*speaker as CFStringRef).to_string()
554                    },
555                )
556                .collect()
557        };
558
559        if let Some(room) = room.upgrade() {
560            room.active_speakers_changed(speakers);
561        }
562        let _ = Weak::into_raw(room);
563    }
564
565    extern "C" fn on_did_subscribe_to_remote_video_track(
566        room: *mut c_void,
567        publisher_id: CFStringRef,
568        track_id: CFStringRef,
569        track: *const c_void,
570    ) {
571        let room = unsafe { Weak::from_raw(room as *mut Room) };
572        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
573        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
574        let track = RemoteVideoTrack::new(track, track_id, publisher_id);
575        if let Some(room) = room.upgrade() {
576            room.did_subscribe_to_remote_video_track(track);
577        }
578        let _ = Weak::into_raw(room);
579    }
580
581    extern "C" fn on_did_unsubscribe_from_remote_video_track(
582        room: *mut c_void,
583        publisher_id: CFStringRef,
584        track_id: CFStringRef,
585    ) {
586        let room = unsafe { Weak::from_raw(room as *mut Room) };
587        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
588        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
589        if let Some(room) = room.upgrade() {
590            room.did_unsubscribe_from_remote_video_track(publisher_id, track_id);
591        }
592        let _ = Weak::into_raw(room);
593    }
594}
595
596impl Drop for RoomDelegate {
597    fn drop(&mut self) {
598        unsafe {
599            CFRelease(self.native_delegate);
600            let _ = Weak::from_raw(self.weak_room);
601        }
602    }
603}
604
605pub struct LocalAudioTrack(*const c_void);
606
607impl LocalAudioTrack {
608    pub fn create() -> Self {
609        Self(unsafe { LKLocalAudioTrackCreateTrack() })
610    }
611}
612
613impl Drop for LocalAudioTrack {
614    fn drop(&mut self) {
615        unsafe { CFRelease(self.0) }
616    }
617}
618
619pub struct LocalVideoTrack(*const c_void);
620
621impl LocalVideoTrack {
622    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
623        Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) })
624    }
625}
626
627impl Drop for LocalVideoTrack {
628    fn drop(&mut self) {
629        unsafe { CFRelease(self.0) }
630    }
631}
632
633pub struct LocalTrackPublication(*const c_void);
634
635impl LocalTrackPublication {
636    pub fn new(native_track_publication: *const c_void) -> Self {
637        unsafe {
638            CFRetain(native_track_publication);
639        }
640        Self(native_track_publication)
641    }
642
643    pub fn set_mute(&self, muted: bool) -> impl Future<Output = Result<()>> {
644        let (tx, rx) = futures::channel::oneshot::channel();
645
646        extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) {
647            let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender<Result<()>>) };
648            if error.is_null() {
649                tx.send(Ok(())).ok();
650            } else {
651                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
652                tx.send(Err(anyhow!(error))).ok();
653            }
654        }
655
656        unsafe {
657            LKLocalTrackPublicationSetMute(
658                self.0,
659                muted,
660                complete_callback,
661                Box::into_raw(Box::new(tx)) as *mut c_void,
662            )
663        }
664
665        async move { rx.await.unwrap() }
666    }
667}
668
669impl Drop for LocalTrackPublication {
670    fn drop(&mut self) {
671        unsafe { CFRelease(self.0) }
672    }
673}
674
675pub struct RemoteTrackPublication(*const c_void);
676
677impl RemoteTrackPublication {
678    pub fn new(native_track_publication: *const c_void) -> Self {
679        unsafe {
680            CFRetain(native_track_publication);
681        }
682        Self(native_track_publication)
683    }
684
685    pub fn set_enabled(&self, enabled: bool) -> impl Future<Output = Result<()>> {
686        let (tx, rx) = futures::channel::oneshot::channel();
687
688        extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) {
689            let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender<Result<()>>) };
690            if error.is_null() {
691                tx.send(Ok(())).ok();
692            } else {
693                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
694                tx.send(Err(anyhow!(error))).ok();
695            }
696        }
697
698        unsafe {
699            LKRemoteTrackPublicationSetEnabled(
700                self.0,
701                enabled,
702                complete_callback,
703                Box::into_raw(Box::new(tx)) as *mut c_void,
704            )
705        }
706
707        async move { rx.await.unwrap() }
708    }
709}
710
711impl Drop for RemoteTrackPublication {
712    fn drop(&mut self) {
713        unsafe { CFRelease(self.0) }
714    }
715}
716
717#[derive(Debug)]
718pub struct RemoteAudioTrack {
719    _native_track: *const c_void,
720    sid: Sid,
721    publisher_id: String,
722}
723
724impl RemoteAudioTrack {
725    fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
726        unsafe {
727            CFRetain(native_track);
728        }
729        Self {
730            _native_track: native_track,
731            sid,
732            publisher_id,
733        }
734    }
735
736    pub fn sid(&self) -> &str {
737        &self.sid
738    }
739
740    pub fn publisher_id(&self) -> &str {
741        &self.publisher_id
742    }
743
744    pub fn enable(&self) -> impl Future<Output = Result<()>> {
745        async { Ok(()) }
746    }
747
748    pub fn disable(&self) -> impl Future<Output = Result<()>> {
749        async { Ok(()) }
750    }
751}
752
753#[derive(Debug)]
754pub struct RemoteVideoTrack {
755    native_track: *const c_void,
756    sid: Sid,
757    publisher_id: String,
758}
759
760impl RemoteVideoTrack {
761    fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
762        unsafe {
763            CFRetain(native_track);
764        }
765        Self {
766            native_track,
767            sid,
768            publisher_id,
769        }
770    }
771
772    pub fn sid(&self) -> &str {
773        &self.sid
774    }
775
776    pub fn publisher_id(&self) -> &str {
777        &self.publisher_id
778    }
779
780    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
781        extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool {
782            unsafe {
783                let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
784                let buffer = CVImageBuffer::wrap_under_get_rule(frame);
785                let result = tx.try_broadcast(Frame(buffer));
786                let _ = Box::into_raw(tx);
787                match result {
788                    Ok(_) => true,
789                    Err(async_broadcast::TrySendError::Closed(_))
790                    | Err(async_broadcast::TrySendError::Inactive(_)) => {
791                        log::warn!("no active receiver for frame");
792                        false
793                    }
794                    Err(async_broadcast::TrySendError::Full(_)) => {
795                        log::warn!("skipping frame as receiver is not keeping up");
796                        true
797                    }
798                }
799            }
800        }
801
802        extern "C" fn on_drop(callback_data: *mut c_void) {
803            unsafe {
804                let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
805            }
806        }
807
808        let (tx, rx) = async_broadcast::broadcast(64);
809        unsafe {
810            let renderer = LKVideoRendererCreate(
811                Box::into_raw(Box::new(tx)) as *mut c_void,
812                on_frame,
813                on_drop,
814            );
815            LKVideoTrackAddRenderer(self.native_track, renderer);
816            rx
817        }
818    }
819}
820
821impl Drop for RemoteVideoTrack {
822    fn drop(&mut self) {
823        unsafe { CFRelease(self.native_track) }
824    }
825}
826
827pub enum RemoteVideoTrackUpdate {
828    Subscribed(Arc<RemoteVideoTrack>),
829    Unsubscribed { publisher_id: Sid, track_id: Sid },
830}
831
832pub enum RemoteAudioTrackUpdate {
833    ActiveSpeakersChanged { speakers: Vec<Sid> },
834    MuteChanged { track_id: Sid, muted: bool },
835    Subscribed(Arc<RemoteAudioTrack>),
836    Unsubscribed { publisher_id: Sid, track_id: Sid },
837}
838
839pub struct MacOSDisplay(*const c_void);
840
841impl MacOSDisplay {
842    fn new(ptr: *const c_void) -> Self {
843        unsafe {
844            CFRetain(ptr);
845        }
846        Self(ptr)
847    }
848}
849
850impl Drop for MacOSDisplay {
851    fn drop(&mut self) {
852        unsafe { CFRelease(self.0) }
853    }
854}
855
856#[derive(Clone)]
857pub struct Frame(CVImageBuffer);
858
859impl Frame {
860    pub fn width(&self) -> usize {
861        self.0.width()
862    }
863
864    pub fn height(&self) -> usize {
865        self.0.height()
866    }
867
868    pub fn image(&self) -> CVImageBuffer {
869        self.0.clone()
870    }
871}