prod.rs

  1use anyhow::{anyhow, Context, Result};
  2use core_foundation::{
  3    array::{CFArray, CFArrayRef},
  4    base::{CFRelease, CFRetain, TCFType},
  5    string::{CFString, CFStringRef},
  6};
  7use futures::{
  8    channel::{mpsc, oneshot},
  9    Future,
 10};
 11pub use media::core_video::CVImageBuffer;
 12use media::core_video::CVImageBufferRef;
 13use parking_lot::Mutex;
 14use postage::watch;
 15use std::{
 16    ffi::c_void,
 17    sync::{Arc, Weak},
 18};
 19
 20extern "C" {
 21    fn LKRoomDelegateCreate(
 22        callback_data: *mut c_void,
 23        on_did_disconnect: extern "C" fn(callback_data: *mut c_void),
 24        on_did_subscribe_to_remote_audio_track: extern "C" fn(
 25            callback_data: *mut c_void,
 26            publisher_id: CFStringRef,
 27            track_id: CFStringRef,
 28            remote_track: *const c_void,
 29            remote_publication: *const c_void,
 30        ),
 31        on_did_unsubscribe_from_remote_audio_track: extern "C" fn(
 32            callback_data: *mut c_void,
 33            publisher_id: CFStringRef,
 34            track_id: CFStringRef,
 35        ),
 36        on_mute_changed_from_remote_audio_track: extern "C" fn(
 37            callback_data: *mut c_void,
 38            track_id: CFStringRef,
 39            muted: bool,
 40        ),
 41        on_active_speakers_changed: extern "C" fn(
 42            callback_data: *mut c_void,
 43            participants: CFArrayRef,
 44        ),
 45        on_did_subscribe_to_remote_video_track: extern "C" fn(
 46            callback_data: *mut c_void,
 47            publisher_id: CFStringRef,
 48            track_id: CFStringRef,
 49            remote_track: *const c_void,
 50        ),
 51        on_did_unsubscribe_from_remote_video_track: extern "C" fn(
 52            callback_data: *mut c_void,
 53            publisher_id: CFStringRef,
 54            track_id: CFStringRef,
 55        ),
 56    ) -> *const c_void;
 57
 58    fn LKRoomCreate(delegate: *const c_void) -> *const c_void;
 59    fn LKRoomConnect(
 60        room: *const c_void,
 61        url: CFStringRef,
 62        token: CFStringRef,
 63        callback: extern "C" fn(*mut c_void, CFStringRef),
 64        callback_data: *mut c_void,
 65    );
 66    fn LKRoomDisconnect(room: *const c_void);
 67    fn LKRoomPublishVideoTrack(
 68        room: *const c_void,
 69        track: *const c_void,
 70        callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
 71        callback_data: *mut c_void,
 72    );
 73    fn LKRoomPublishAudioTrack(
 74        room: *const c_void,
 75        track: *const c_void,
 76        callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
 77        callback_data: *mut c_void,
 78    );
 79    fn LKRoomUnpublishTrack(room: *const c_void, publication: *const c_void);
 80    fn LKRoomAudioTracksForRemoteParticipant(
 81        room: *const c_void,
 82        participant_id: CFStringRef,
 83    ) -> CFArrayRef;
 84
 85    fn LKRoomAudioTrackPublicationsForRemoteParticipant(
 86        room: *const c_void,
 87        participant_id: CFStringRef,
 88    ) -> CFArrayRef;
 89
 90    fn LKRoomVideoTracksForRemoteParticipant(
 91        room: *const c_void,
 92        participant_id: CFStringRef,
 93    ) -> CFArrayRef;
 94
 95    fn LKVideoRendererCreate(
 96        callback_data: *mut c_void,
 97        on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool,
 98        on_drop: extern "C" fn(callback_data: *mut c_void),
 99    ) -> *const c_void;
100
101    fn LKRemoteAudioTrackGetSid(track: *const c_void) -> CFStringRef;
102    fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
103    fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef;
104
105    fn LKDisplaySources(
106        callback_data: *mut c_void,
107        callback: extern "C" fn(
108            callback_data: *mut c_void,
109            sources: CFArrayRef,
110            error: CFStringRef,
111        ),
112    );
113    fn LKCreateScreenShareTrackForDisplay(display: *const c_void) -> *const c_void;
114    fn LKLocalAudioTrackCreateTrack() -> *const c_void;
115
116    fn LKLocalTrackPublicationSetMute(
117        publication: *const c_void,
118        muted: bool,
119        on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef),
120        callback_data: *mut c_void,
121    );
122
123    fn LKRemoteTrackPublicationSetEnabled(
124        publication: *const c_void,
125        enabled: bool,
126        on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef),
127        callback_data: *mut c_void,
128    );
129
130    fn LKRemoteTrackPublicationIsMuted(publication: *const c_void) -> bool;
131    fn LKRemoteTrackPublicationGetSid(publication: *const c_void) -> CFStringRef;
132}
133
134pub type Sid = String;
135
136#[derive(Clone, Eq, PartialEq)]
137pub enum ConnectionState {
138    Disconnected,
139    Connected { url: String, token: String },
140}
141
142pub struct Room {
143    native_room: *const c_void,
144    connection: Mutex<(
145        watch::Sender<ConnectionState>,
146        watch::Receiver<ConnectionState>,
147    )>,
148    remote_audio_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteAudioTrackUpdate>>>,
149    remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
150    _delegate: RoomDelegate,
151}
152
153impl Room {
154    pub fn new() -> Arc<Self> {
155        Arc::new_cyclic(|weak_room| {
156            let delegate = RoomDelegate::new(weak_room.clone());
157            Self {
158                native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
159                connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)),
160                remote_audio_track_subscribers: Default::default(),
161                remote_video_track_subscribers: Default::default(),
162                _delegate: delegate,
163            }
164        })
165    }
166
167    pub fn status(&self) -> watch::Receiver<ConnectionState> {
168        self.connection.lock().1.clone()
169    }
170
171    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
172        let url = CFString::new(url);
173        let token = CFString::new(token);
174        let (did_connect, tx, rx) = Self::build_done_callback();
175        unsafe {
176            LKRoomConnect(
177                self.native_room,
178                url.as_concrete_TypeRef(),
179                token.as_concrete_TypeRef(),
180                did_connect,
181                tx,
182            )
183        }
184
185        let this = self.clone();
186        let url = url.to_string();
187        let token = token.to_string();
188        async move {
189            rx.await.unwrap().context("error connecting to room")?;
190            *this.connection.lock().0.borrow_mut() = ConnectionState::Connected { url, token };
191            Ok(())
192        }
193    }
194
195    fn did_disconnect(&self) {
196        *self.connection.lock().0.borrow_mut() = ConnectionState::Disconnected;
197    }
198
199    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
200        extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
201            unsafe {
202                let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
203
204                if sources.is_null() {
205                    let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
206                } else {
207                    let sources = CFArray::wrap_under_get_rule(sources)
208                        .into_iter()
209                        .map(|source| MacOSDisplay::new(*source))
210                        .collect();
211
212                    let _ = tx.send(Ok(sources));
213                }
214            }
215        }
216
217        let (tx, rx) = oneshot::channel();
218
219        unsafe {
220            LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
221        }
222
223        async move { rx.await.unwrap() }
224    }
225
226    pub fn publish_video_track(
227        self: &Arc<Self>,
228        track: &LocalVideoTrack,
229    ) -> impl Future<Output = Result<LocalTrackPublication>> {
230        let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
231        extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
232            let tx =
233                unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
234            if error.is_null() {
235                let _ = tx.send(Ok(LocalTrackPublication::new(publication)));
236            } else {
237                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
238                let _ = tx.send(Err(anyhow!(error)));
239            }
240        }
241        unsafe {
242            LKRoomPublishVideoTrack(
243                self.native_room,
244                track.0,
245                callback,
246                Box::into_raw(Box::new(tx)) as *mut c_void,
247            );
248        }
249        async { rx.await.unwrap().context("error publishing video track") }
250    }
251
252    pub fn publish_audio_track(
253        self: &Arc<Self>,
254        track: &LocalAudioTrack,
255    ) -> impl Future<Output = Result<LocalTrackPublication>> {
256        let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
257        extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
258            let tx =
259                unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
260            if error.is_null() {
261                let _ = tx.send(Ok(LocalTrackPublication::new(publication)));
262            } else {
263                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
264                let _ = tx.send(Err(anyhow!(error)));
265            }
266        }
267        unsafe {
268            LKRoomPublishAudioTrack(
269                self.native_room,
270                track.0,
271                callback,
272                Box::into_raw(Box::new(tx)) as *mut c_void,
273            );
274        }
275        async { rx.await.unwrap().context("error publishing audio track") }
276    }
277
278    pub fn unpublish_track(&self, publication: LocalTrackPublication) {
279        unsafe {
280            LKRoomUnpublishTrack(self.native_room, publication.0);
281        }
282    }
283
284    pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
285        unsafe {
286            let tracks = LKRoomVideoTracksForRemoteParticipant(
287                self.native_room,
288                CFString::new(participant_id).as_concrete_TypeRef(),
289            );
290
291            if tracks.is_null() {
292                Vec::new()
293            } else {
294                let tracks = CFArray::wrap_under_get_rule(tracks);
295                tracks
296                    .into_iter()
297                    .map(|native_track| {
298                        let native_track = *native_track;
299                        let id =
300                            CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
301                                .to_string();
302                        Arc::new(RemoteVideoTrack::new(
303                            native_track,
304                            id,
305                            participant_id.into(),
306                        ))
307                    })
308                    .collect()
309            }
310        }
311    }
312
313    pub fn remote_audio_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
314        unsafe {
315            let tracks = LKRoomAudioTracksForRemoteParticipant(
316                self.native_room,
317                CFString::new(participant_id).as_concrete_TypeRef(),
318            );
319
320            if tracks.is_null() {
321                Vec::new()
322            } else {
323                let tracks = CFArray::wrap_under_get_rule(tracks);
324                tracks
325                    .into_iter()
326                    .map(|native_track| {
327                        let native_track = *native_track;
328                        let id =
329                            CFString::wrap_under_get_rule(LKRemoteAudioTrackGetSid(native_track))
330                                .to_string();
331                        Arc::new(RemoteAudioTrack::new(
332                            native_track,
333                            id,
334                            participant_id.into(),
335                        ))
336                    })
337                    .collect()
338            }
339        }
340    }
341
342    pub fn remote_audio_track_publications(
343        &self,
344        participant_id: &str,
345    ) -> Vec<Arc<RemoteTrackPublication>> {
346        unsafe {
347            let tracks = LKRoomAudioTrackPublicationsForRemoteParticipant(
348                self.native_room,
349                CFString::new(participant_id).as_concrete_TypeRef(),
350            );
351
352            if tracks.is_null() {
353                Vec::new()
354            } else {
355                let tracks = CFArray::wrap_under_get_rule(tracks);
356                tracks
357                    .into_iter()
358                    .map(|native_track_publication| {
359                        let native_track_publication = *native_track_publication;
360                        Arc::new(RemoteTrackPublication::new(native_track_publication))
361                    })
362                    .collect()
363            }
364        }
365    }
366
367    pub fn remote_audio_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteAudioTrackUpdate> {
368        let (tx, rx) = mpsc::unbounded();
369        self.remote_audio_track_subscribers.lock().push(tx);
370        rx
371    }
372
373    pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
374        let (tx, rx) = mpsc::unbounded();
375        self.remote_video_track_subscribers.lock().push(tx);
376        rx
377    }
378
379    fn did_subscribe_to_remote_audio_track(
380        &self,
381        track: RemoteAudioTrack,
382        publication: RemoteTrackPublication,
383    ) {
384        let track = Arc::new(track);
385        let publication = Arc::new(publication);
386        self.remote_audio_track_subscribers.lock().retain(|tx| {
387            tx.unbounded_send(RemoteAudioTrackUpdate::Subscribed(
388                track.clone(),
389                publication.clone(),
390            ))
391            .is_ok()
392        });
393    }
394
395    fn did_unsubscribe_from_remote_audio_track(&self, publisher_id: String, track_id: String) {
396        self.remote_audio_track_subscribers.lock().retain(|tx| {
397            tx.unbounded_send(RemoteAudioTrackUpdate::Unsubscribed {
398                publisher_id: publisher_id.clone(),
399                track_id: track_id.clone(),
400            })
401            .is_ok()
402        });
403    }
404
405    fn mute_changed_from_remote_audio_track(&self, track_id: String, muted: bool) {
406        self.remote_audio_track_subscribers.lock().retain(|tx| {
407            tx.unbounded_send(RemoteAudioTrackUpdate::MuteChanged {
408                track_id: track_id.clone(),
409                muted,
410            })
411            .is_ok()
412        });
413    }
414
415    // A vec of publisher IDs
416    fn active_speakers_changed(&self, speakers: Vec<String>) {
417        self.remote_audio_track_subscribers
418            .lock()
419            .retain(move |tx| {
420                tx.unbounded_send(RemoteAudioTrackUpdate::ActiveSpeakersChanged {
421                    speakers: speakers.clone(),
422                })
423                .is_ok()
424            });
425    }
426
427    fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
428        let track = Arc::new(track);
429        self.remote_video_track_subscribers.lock().retain(|tx| {
430            tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
431                .is_ok()
432        });
433    }
434
435    fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
436        self.remote_video_track_subscribers.lock().retain(|tx| {
437            tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
438                publisher_id: publisher_id.clone(),
439                track_id: track_id.clone(),
440            })
441            .is_ok()
442        });
443    }
444
445    fn build_done_callback() -> (
446        extern "C" fn(*mut c_void, CFStringRef),
447        *mut c_void,
448        oneshot::Receiver<Result<()>>,
449    ) {
450        let (tx, rx) = oneshot::channel();
451        extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
452            let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
453            if error.is_null() {
454                let _ = tx.send(Ok(()));
455            } else {
456                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
457                let _ = tx.send(Err(anyhow!(error)));
458            }
459        }
460        (
461            done_callback,
462            Box::into_raw(Box::new(tx)) as *mut c_void,
463            rx,
464        )
465    }
466}
467
468impl Drop for Room {
469    fn drop(&mut self) {
470        unsafe {
471            LKRoomDisconnect(self.native_room);
472            CFRelease(self.native_room);
473        }
474    }
475}
476
477struct RoomDelegate {
478    native_delegate: *const c_void,
479    weak_room: *const Room,
480}
481
482impl RoomDelegate {
483    fn new(weak_room: Weak<Room>) -> Self {
484        let weak_room = Weak::into_raw(weak_room);
485        let native_delegate = unsafe {
486            LKRoomDelegateCreate(
487                weak_room as *mut c_void,
488                Self::on_did_disconnect,
489                Self::on_did_subscribe_to_remote_audio_track,
490                Self::on_did_unsubscribe_from_remote_audio_track,
491                Self::on_mute_change_from_remote_audio_track,
492                Self::on_active_speakers_changed,
493                Self::on_did_subscribe_to_remote_video_track,
494                Self::on_did_unsubscribe_from_remote_video_track,
495            )
496        };
497        Self {
498            native_delegate,
499            weak_room,
500        }
501    }
502
503    extern "C" fn on_did_disconnect(room: *mut c_void) {
504        let room = unsafe { Weak::from_raw(room as *mut Room) };
505        if let Some(room) = room.upgrade() {
506            room.did_disconnect();
507        }
508        let _ = Weak::into_raw(room);
509    }
510
511    extern "C" fn on_did_subscribe_to_remote_audio_track(
512        room: *mut c_void,
513        publisher_id: CFStringRef,
514        track_id: CFStringRef,
515        track: *const c_void,
516        publication: *const c_void,
517    ) {
518        let room = unsafe { Weak::from_raw(room as *mut Room) };
519        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
520        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
521        let track = RemoteAudioTrack::new(track, track_id, publisher_id);
522        let publication = RemoteTrackPublication::new(publication);
523        if let Some(room) = room.upgrade() {
524            room.did_subscribe_to_remote_audio_track(track, publication);
525        }
526        let _ = Weak::into_raw(room);
527    }
528
529    extern "C" fn on_did_unsubscribe_from_remote_audio_track(
530        room: *mut c_void,
531        publisher_id: CFStringRef,
532        track_id: CFStringRef,
533    ) {
534        let room = unsafe { Weak::from_raw(room as *mut Room) };
535        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
536        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
537        if let Some(room) = room.upgrade() {
538            room.did_unsubscribe_from_remote_audio_track(publisher_id, track_id);
539        }
540        let _ = Weak::into_raw(room);
541    }
542
543    extern "C" fn on_mute_change_from_remote_audio_track(
544        room: *mut c_void,
545        track_id: CFStringRef,
546        muted: bool,
547    ) {
548        let room = unsafe { Weak::from_raw(room as *mut Room) };
549        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
550        if let Some(room) = room.upgrade() {
551            room.mute_changed_from_remote_audio_track(track_id, muted);
552        }
553        let _ = Weak::into_raw(room);
554    }
555
556    extern "C" fn on_active_speakers_changed(room: *mut c_void, participants: CFArrayRef) {
557        if participants.is_null() {
558            return;
559        }
560
561        let room = unsafe { Weak::from_raw(room as *mut Room) };
562        let speakers = unsafe {
563            CFArray::wrap_under_get_rule(participants)
564                .into_iter()
565                .map(
566                    |speaker: core_foundation::base::ItemRef<'_, *const c_void>| {
567                        CFString::wrap_under_get_rule(*speaker as CFStringRef).to_string()
568                    },
569                )
570                .collect()
571        };
572
573        if let Some(room) = room.upgrade() {
574            room.active_speakers_changed(speakers);
575        }
576        let _ = Weak::into_raw(room);
577    }
578
579    extern "C" fn on_did_subscribe_to_remote_video_track(
580        room: *mut c_void,
581        publisher_id: CFStringRef,
582        track_id: CFStringRef,
583        track: *const c_void,
584    ) {
585        let room = unsafe { Weak::from_raw(room as *mut Room) };
586        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
587        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
588        let track = RemoteVideoTrack::new(track, track_id, publisher_id);
589        if let Some(room) = room.upgrade() {
590            room.did_subscribe_to_remote_video_track(track);
591        }
592        let _ = Weak::into_raw(room);
593    }
594
595    extern "C" fn on_did_unsubscribe_from_remote_video_track(
596        room: *mut c_void,
597        publisher_id: CFStringRef,
598        track_id: CFStringRef,
599    ) {
600        let room = unsafe { Weak::from_raw(room as *mut Room) };
601        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
602        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
603        if let Some(room) = room.upgrade() {
604            room.did_unsubscribe_from_remote_video_track(publisher_id, track_id);
605        }
606        let _ = Weak::into_raw(room);
607    }
608}
609
610impl Drop for RoomDelegate {
611    fn drop(&mut self) {
612        unsafe {
613            CFRelease(self.native_delegate);
614            let _ = Weak::from_raw(self.weak_room);
615        }
616    }
617}
618
619pub struct LocalAudioTrack(*const c_void);
620
621impl LocalAudioTrack {
622    pub fn create() -> Self {
623        Self(unsafe { LKLocalAudioTrackCreateTrack() })
624    }
625}
626
627impl Drop for LocalAudioTrack {
628    fn drop(&mut self) {
629        unsafe { CFRelease(self.0) }
630    }
631}
632
633pub struct LocalVideoTrack(*const c_void);
634
635impl LocalVideoTrack {
636    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
637        Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) })
638    }
639}
640
641impl Drop for LocalVideoTrack {
642    fn drop(&mut self) {
643        unsafe { CFRelease(self.0) }
644    }
645}
646
647pub struct LocalTrackPublication(*const c_void);
648
649impl LocalTrackPublication {
650    pub fn new(native_track_publication: *const c_void) -> Self {
651        unsafe {
652            CFRetain(native_track_publication);
653        }
654        Self(native_track_publication)
655    }
656
657    pub fn set_mute(&self, muted: bool) -> impl Future<Output = Result<()>> {
658        let (tx, rx) = futures::channel::oneshot::channel();
659
660        extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) {
661            let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender<Result<()>>) };
662            if error.is_null() {
663                tx.send(Ok(())).ok();
664            } else {
665                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
666                tx.send(Err(anyhow!(error))).ok();
667            }
668        }
669
670        unsafe {
671            LKLocalTrackPublicationSetMute(
672                self.0,
673                muted,
674                complete_callback,
675                Box::into_raw(Box::new(tx)) as *mut c_void,
676            )
677        }
678
679        async move { rx.await.unwrap() }
680    }
681}
682
683impl Drop for LocalTrackPublication {
684    fn drop(&mut self) {
685        unsafe { CFRelease(self.0) }
686    }
687}
688
689pub struct RemoteTrackPublication(*const c_void);
690
691impl RemoteTrackPublication {
692    pub fn new(native_track_publication: *const c_void) -> Self {
693        unsafe {
694            CFRetain(native_track_publication);
695        }
696        Self(native_track_publication)
697    }
698
699    pub fn sid(&self) -> String {
700        unsafe { CFString::wrap_under_get_rule(LKRemoteTrackPublicationGetSid(self.0)).to_string() }
701    }
702
703    pub fn is_muted(&self) -> bool {
704        unsafe { LKRemoteTrackPublicationIsMuted(self.0) }
705    }
706
707    pub fn set_enabled(&self, enabled: bool) -> impl Future<Output = Result<()>> {
708        let (tx, rx) = futures::channel::oneshot::channel();
709
710        extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) {
711            let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender<Result<()>>) };
712            if error.is_null() {
713                tx.send(Ok(())).ok();
714            } else {
715                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
716                tx.send(Err(anyhow!(error))).ok();
717            }
718        }
719
720        unsafe {
721            LKRemoteTrackPublicationSetEnabled(
722                self.0,
723                enabled,
724                complete_callback,
725                Box::into_raw(Box::new(tx)) as *mut c_void,
726            )
727        }
728
729        async move { rx.await.unwrap() }
730    }
731}
732
733impl Drop for RemoteTrackPublication {
734    fn drop(&mut self) {
735        unsafe { CFRelease(self.0) }
736    }
737}
738
739#[derive(Debug)]
740pub struct RemoteAudioTrack {
741    _native_track: *const c_void,
742    sid: Sid,
743    publisher_id: String,
744}
745
746impl RemoteAudioTrack {
747    fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
748        unsafe {
749            CFRetain(native_track);
750        }
751        Self {
752            _native_track: native_track,
753            sid,
754            publisher_id,
755        }
756    }
757
758    pub fn sid(&self) -> &str {
759        &self.sid
760    }
761
762    pub fn publisher_id(&self) -> &str {
763        &self.publisher_id
764    }
765
766    pub fn enable(&self) -> impl Future<Output = Result<()>> {
767        async { Ok(()) }
768    }
769
770    pub fn disable(&self) -> impl Future<Output = Result<()>> {
771        async { Ok(()) }
772    }
773}
774
775#[derive(Debug)]
776pub struct RemoteVideoTrack {
777    native_track: *const c_void,
778    sid: Sid,
779    publisher_id: String,
780}
781
782impl RemoteVideoTrack {
783    fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
784        unsafe {
785            CFRetain(native_track);
786        }
787        Self {
788            native_track,
789            sid,
790            publisher_id,
791        }
792    }
793
794    pub fn sid(&self) -> &str {
795        &self.sid
796    }
797
798    pub fn publisher_id(&self) -> &str {
799        &self.publisher_id
800    }
801
802    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
803        extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool {
804            unsafe {
805                let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
806                let buffer = CVImageBuffer::wrap_under_get_rule(frame);
807                let result = tx.try_broadcast(Frame(buffer));
808                let _ = Box::into_raw(tx);
809                match result {
810                    Ok(_) => true,
811                    Err(async_broadcast::TrySendError::Closed(_))
812                    | Err(async_broadcast::TrySendError::Inactive(_)) => {
813                        log::warn!("no active receiver for frame");
814                        false
815                    }
816                    Err(async_broadcast::TrySendError::Full(_)) => {
817                        log::warn!("skipping frame as receiver is not keeping up");
818                        true
819                    }
820                }
821            }
822        }
823
824        extern "C" fn on_drop(callback_data: *mut c_void) {
825            unsafe {
826                let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
827            }
828        }
829
830        let (tx, rx) = async_broadcast::broadcast(64);
831        unsafe {
832            let renderer = LKVideoRendererCreate(
833                Box::into_raw(Box::new(tx)) as *mut c_void,
834                on_frame,
835                on_drop,
836            );
837            LKVideoTrackAddRenderer(self.native_track, renderer);
838            rx
839        }
840    }
841}
842
843impl Drop for RemoteVideoTrack {
844    fn drop(&mut self) {
845        unsafe { CFRelease(self.native_track) }
846    }
847}
848
849pub enum RemoteVideoTrackUpdate {
850    Subscribed(Arc<RemoteVideoTrack>),
851    Unsubscribed { publisher_id: Sid, track_id: Sid },
852}
853
854pub enum RemoteAudioTrackUpdate {
855    ActiveSpeakersChanged { speakers: Vec<Sid> },
856    MuteChanged { track_id: Sid, muted: bool },
857    Subscribed(Arc<RemoteAudioTrack>, Arc<RemoteTrackPublication>),
858    Unsubscribed { publisher_id: Sid, track_id: Sid },
859}
860
861pub struct MacOSDisplay(*const c_void);
862
863impl MacOSDisplay {
864    fn new(ptr: *const c_void) -> Self {
865        unsafe {
866            CFRetain(ptr);
867        }
868        Self(ptr)
869    }
870}
871
872impl Drop for MacOSDisplay {
873    fn drop(&mut self) {
874        unsafe { CFRelease(self.0) }
875    }
876}
877
878#[derive(Clone)]
879pub struct Frame(CVImageBuffer);
880
881impl Frame {
882    pub fn width(&self) -> usize {
883        self.0.width()
884    }
885
886    pub fn height(&self) -> usize {
887        self.0.height()
888    }
889
890    pub fn image(&self) -> CVImageBuffer {
891        self.0.clone()
892    }
893}