prod.rs

  1use anyhow::{anyhow, Context, Result};
  2use core_foundation::{
  3    array::{CFArray, CFArrayRef},
  4    base::{CFRelease, CFRetain, TCFType},
  5    string::{CFString, CFStringRef},
  6};
  7use futures::{
  8    channel::{mpsc, oneshot},
  9    Future,
 10};
 11pub use media::core_video::CVImageBuffer;
 12use media::core_video::CVImageBufferRef;
 13use parking_lot::Mutex;
 14use postage::watch;
 15use std::{
 16    ffi::c_void,
 17    sync::{Arc, Weak},
 18};
 19
 20extern "C" {
 21    fn LKRoomDelegateCreate(
 22        callback_data: *mut c_void,
 23        on_did_disconnect: extern "C" fn(callback_data: *mut c_void),
 24        on_did_subscribe_to_remote_audio_track: extern "C" fn(
 25            callback_data: *mut c_void,
 26            publisher_id: CFStringRef,
 27            track_id: CFStringRef,
 28            remote_track: *const c_void,
 29        ),
 30        on_did_unsubscribe_from_remote_audio_track: extern "C" fn(
 31            callback_data: *mut c_void,
 32            publisher_id: CFStringRef,
 33            track_id: CFStringRef,
 34        ),
 35        on_did_subscribe_to_remote_video_track: extern "C" fn(
 36            callback_data: *mut c_void,
 37            publisher_id: CFStringRef,
 38            track_id: CFStringRef,
 39            remote_track: *const c_void,
 40        ),
 41        on_did_unsubscribe_from_remote_video_track: extern "C" fn(
 42            callback_data: *mut c_void,
 43            publisher_id: CFStringRef,
 44            track_id: CFStringRef,
 45        ),
 46    ) -> *const c_void;
 47
 48    fn LKRoomCreate(delegate: *const c_void) -> *const c_void;
 49    fn LKRoomConnect(
 50        room: *const c_void,
 51        url: CFStringRef,
 52        token: CFStringRef,
 53        callback: extern "C" fn(*mut c_void, CFStringRef),
 54        callback_data: *mut c_void,
 55    );
 56    fn LKRoomDisconnect(room: *const c_void);
 57    fn LKRoomPublishVideoTrack(
 58        room: *const c_void,
 59        track: *const c_void,
 60        callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
 61        callback_data: *mut c_void,
 62    );
 63    fn LKRoomPublishAudioTrack(
 64        room: *const c_void,
 65        track: *const c_void,
 66        callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
 67        callback_data: *mut c_void,
 68    );
 69    fn LKRoomUnpublishTrack(room: *const c_void, publication: *const c_void);
 70    fn LKRoomAudioTracksForRemoteParticipant(
 71        room: *const c_void,
 72        participant_id: CFStringRef,
 73    ) -> CFArrayRef;
 74
 75    fn LKRoomVideoTracksForRemoteParticipant(
 76        room: *const c_void,
 77        participant_id: CFStringRef,
 78    ) -> CFArrayRef;
 79
 80    fn LKVideoRendererCreate(
 81        callback_data: *mut c_void,
 82        on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool,
 83        on_drop: extern "C" fn(callback_data: *mut c_void),
 84    ) -> *const c_void;
 85
 86    fn LKRemoteAudioTrackGetSid(track: *const c_void) -> CFStringRef;
 87    // fn LKRemoteAudioTrackStart(
 88    //     track: *const c_void,
 89    //     callback: extern "C" fn(*mut c_void, bool),
 90    //     callback_data: *mut c_void
 91    // );
 92
 93    fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
 94    fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef;
 95
 96    fn LKDisplaySources(
 97        callback_data: *mut c_void,
 98        callback: extern "C" fn(
 99            callback_data: *mut c_void,
100            sources: CFArrayRef,
101            error: CFStringRef,
102        ),
103    );
104    fn LKCreateScreenShareTrackForDisplay(display: *const c_void) -> *const c_void;
105    fn LKLocalAudioTrackCreateTrack() -> *const c_void;
106}
107
108pub type Sid = String;
109
110#[derive(Clone, Eq, PartialEq)]
111pub enum ConnectionState {
112    Disconnected,
113    Connected { url: String, token: String },
114}
115
116pub struct Room {
117    native_room: *const c_void,
118    connection: Mutex<(
119        watch::Sender<ConnectionState>,
120        watch::Receiver<ConnectionState>,
121    )>,
122    remote_audio_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteAudioTrackUpdate>>>,
123    remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
124    _delegate: RoomDelegate,
125}
126
127impl Room {
128    pub fn new() -> Arc<Self> {
129        Arc::new_cyclic(|weak_room| {
130            let delegate = RoomDelegate::new(weak_room.clone());
131            Self {
132                native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
133                connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)),
134                remote_audio_track_subscribers: Default::default(),
135                remote_video_track_subscribers: Default::default(),
136                _delegate: delegate,
137            }
138        })
139    }
140
141    pub fn status(&self) -> watch::Receiver<ConnectionState> {
142        self.connection.lock().1.clone()
143    }
144
145    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
146        let url = CFString::new(url);
147        let token = CFString::new(token);
148        let (did_connect, tx, rx) = Self::build_done_callback();
149        unsafe {
150            LKRoomConnect(
151                self.native_room,
152                url.as_concrete_TypeRef(),
153                token.as_concrete_TypeRef(),
154                did_connect,
155                tx,
156            )
157        }
158
159        let this = self.clone();
160        let url = url.to_string();
161        let token = token.to_string();
162        async move {
163            rx.await.unwrap().context("error connecting to room")?;
164            *this.connection.lock().0.borrow_mut() = ConnectionState::Connected { url, token };
165            Ok(())
166        }
167    }
168
169    fn did_disconnect(&self) {
170        *self.connection.lock().0.borrow_mut() = ConnectionState::Disconnected;
171    }
172
173    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
174        extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
175            unsafe {
176                let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
177
178                if sources.is_null() {
179                    let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
180                } else {
181                    let sources = CFArray::wrap_under_get_rule(sources)
182                        .into_iter()
183                        .map(|source| MacOSDisplay::new(*source))
184                        .collect();
185
186                    let _ = tx.send(Ok(sources));
187                }
188            }
189        }
190
191        let (tx, rx) = oneshot::channel();
192
193        unsafe {
194            LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
195        }
196
197        async move { rx.await.unwrap() }
198    }
199
200    pub fn publish_video_track(
201        self: &Arc<Self>,
202        track: &LocalVideoTrack,
203    ) -> impl Future<Output = Result<LocalTrackPublication>> {
204        let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
205        extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
206            let tx =
207                unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
208            if error.is_null() {
209                let _ = tx.send(Ok(LocalTrackPublication(publication)));
210            } else {
211                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
212                let _ = tx.send(Err(anyhow!(error)));
213            }
214        }
215        unsafe {
216            LKRoomPublishVideoTrack(
217                self.native_room,
218                track.0,
219                callback,
220                Box::into_raw(Box::new(tx)) as *mut c_void,
221            );
222        }
223        async { rx.await.unwrap().context("error publishing video track") }
224    }
225
226    pub fn publish_audio_track(
227        self: &Arc<Self>,
228        track: &LocalAudioTrack,
229    ) -> impl Future<Output = Result<LocalTrackPublication>> {
230        let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
231        extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
232            let tx =
233                unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
234            if error.is_null() {
235                let _ = tx.send(Ok(LocalTrackPublication(publication)));
236            } else {
237                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
238                let _ = tx.send(Err(anyhow!(error)));
239            }
240        }
241        unsafe {
242            LKRoomPublishAudioTrack(
243                self.native_room,
244                track.0,
245                callback,
246                Box::into_raw(Box::new(tx)) as *mut c_void,
247            );
248        }
249        async { rx.await.unwrap().context("error publishing video track") }
250    }
251
252    pub fn unpublish_track(&self, publication: LocalTrackPublication) {
253        unsafe {
254            LKRoomUnpublishTrack(self.native_room, publication.0);
255        }
256    }
257
258    pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
259        unsafe {
260            let tracks = LKRoomVideoTracksForRemoteParticipant(
261                self.native_room,
262                CFString::new(participant_id).as_concrete_TypeRef(),
263            );
264
265            if tracks.is_null() {
266                Vec::new()
267            } else {
268                let tracks = CFArray::wrap_under_get_rule(tracks);
269                tracks
270                    .into_iter()
271                    .map(|native_track| {
272                        let native_track = *native_track;
273                        let id =
274                            CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
275                                .to_string();
276                        Arc::new(RemoteVideoTrack::new(
277                            native_track,
278                            id,
279                            participant_id.into(),
280                        ))
281                    })
282                    .collect()
283            }
284        }
285    }
286
287    pub fn remote_audio_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
288        unsafe {
289            let tracks = LKRoomAudioTracksForRemoteParticipant(
290                self.native_room,
291                CFString::new(participant_id).as_concrete_TypeRef(),
292            );
293
294            if tracks.is_null() {
295                Vec::new()
296            } else {
297                let tracks = CFArray::wrap_under_get_rule(tracks);
298                tracks
299                    .into_iter()
300                    .map(|native_track| {
301                        let native_track = *native_track;
302                        let id =
303                            CFString::wrap_under_get_rule(LKRemoteAudioTrackGetSid(native_track))
304                                .to_string();
305                        Arc::new(RemoteAudioTrack::new(
306                            native_track,
307                            id,
308                            participant_id.into(),
309                        ))
310                    })
311                    .collect()
312            }
313        }
314    }
315
316    pub fn remote_audio_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteAudioTrackUpdate> {
317        let (tx, rx) = mpsc::unbounded();
318        self.remote_audio_track_subscribers.lock().push(tx);
319        rx
320    }
321
322    pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
323        let (tx, rx) = mpsc::unbounded();
324        self.remote_video_track_subscribers.lock().push(tx);
325        rx
326    }
327
328    fn did_subscribe_to_remote_audio_track(&self, track: RemoteAudioTrack) {
329        let track = Arc::new(track);
330        self.remote_audio_track_subscribers.lock().retain(|tx| {
331            tx.unbounded_send(RemoteAudioTrackUpdate::Subscribed(track.clone()))
332                .is_ok()
333        });
334    }
335
336    fn did_unsubscribe_from_remote_audio_track(&self, publisher_id: String, track_id: String) {
337        self.remote_audio_track_subscribers.lock().retain(|tx| {
338            tx.unbounded_send(RemoteAudioTrackUpdate::Unsubscribed {
339                publisher_id: publisher_id.clone(),
340                track_id: track_id.clone(),
341            })
342            .is_ok()
343        });
344    }
345
346    fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
347        let track = Arc::new(track);
348        self.remote_video_track_subscribers.lock().retain(|tx| {
349            tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
350                .is_ok()
351        });
352    }
353
354    fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
355        self.remote_video_track_subscribers.lock().retain(|tx| {
356            tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
357                publisher_id: publisher_id.clone(),
358                track_id: track_id.clone(),
359            })
360            .is_ok()
361        });
362    }
363
364    fn build_done_callback() -> (
365        extern "C" fn(*mut c_void, CFStringRef),
366        *mut c_void,
367        oneshot::Receiver<Result<()>>,
368    ) {
369        let (tx, rx) = oneshot::channel();
370        extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
371            let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
372            if error.is_null() {
373                let _ = tx.send(Ok(()));
374            } else {
375                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
376                let _ = tx.send(Err(anyhow!(error)));
377            }
378        }
379        (
380            done_callback,
381            Box::into_raw(Box::new(tx)) as *mut c_void,
382            rx,
383        )
384    }
385}
386
387impl Drop for Room {
388    fn drop(&mut self) {
389        unsafe {
390            LKRoomDisconnect(self.native_room);
391            CFRelease(self.native_room);
392        }
393    }
394}
395
396struct RoomDelegate {
397    native_delegate: *const c_void,
398    weak_room: *const Room,
399}
400
401impl RoomDelegate {
402    fn new(weak_room: Weak<Room>) -> Self {
403        let weak_room = Weak::into_raw(weak_room);
404        let native_delegate = unsafe {
405            LKRoomDelegateCreate(
406                weak_room as *mut c_void,
407                Self::on_did_disconnect,
408                Self::on_did_subscribe_to_remote_audio_track,
409                Self::on_did_unsubscribe_from_remote_audio_track,
410                Self::on_did_subscribe_to_remote_video_track,
411                Self::on_did_unsubscribe_from_remote_video_track,
412            )
413        };
414        Self {
415            native_delegate,
416            weak_room,
417        }
418    }
419
420    extern "C" fn on_did_disconnect(room: *mut c_void) {
421        let room = unsafe { Weak::from_raw(room as *mut Room) };
422        if let Some(room) = room.upgrade() {
423            room.did_disconnect();
424        }
425        let _ = Weak::into_raw(room);
426    }
427
428    extern "C" fn on_did_subscribe_to_remote_audio_track(
429        room: *mut c_void,
430        publisher_id: CFStringRef,
431        track_id: CFStringRef,
432        track: *const c_void,
433    ) {
434        let room = unsafe { Weak::from_raw(room as *mut Room) };
435        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
436        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
437        let track = RemoteAudioTrack::new(track, track_id, publisher_id);
438        if let Some(room) = room.upgrade() {
439            room.did_subscribe_to_remote_audio_track(track);
440        }
441        let _ = Weak::into_raw(room);
442    }
443
444    extern "C" fn on_did_unsubscribe_from_remote_audio_track(
445        room: *mut c_void,
446        publisher_id: CFStringRef,
447        track_id: CFStringRef,
448    ) {
449        let room = unsafe { Weak::from_raw(room as *mut Room) };
450        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
451        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
452        if let Some(room) = room.upgrade() {
453            room.did_unsubscribe_from_remote_audio_track(publisher_id, track_id);
454        }
455        let _ = Weak::into_raw(room);
456    }
457
458    extern "C" fn on_did_subscribe_to_remote_video_track(
459        room: *mut c_void,
460        publisher_id: CFStringRef,
461        track_id: CFStringRef,
462        track: *const c_void,
463    ) {
464        let room = unsafe { Weak::from_raw(room as *mut Room) };
465        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
466        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
467        let track = RemoteVideoTrack::new(track, track_id, publisher_id);
468        if let Some(room) = room.upgrade() {
469            room.did_subscribe_to_remote_video_track(track);
470        }
471        let _ = Weak::into_raw(room);
472    }
473
474    extern "C" fn on_did_unsubscribe_from_remote_video_track(
475        room: *mut c_void,
476        publisher_id: CFStringRef,
477        track_id: CFStringRef,
478    ) {
479        let room = unsafe { Weak::from_raw(room as *mut Room) };
480        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
481        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
482        if let Some(room) = room.upgrade() {
483            room.did_unsubscribe_from_remote_video_track(publisher_id, track_id);
484        }
485        let _ = Weak::into_raw(room);
486    }
487}
488
489impl Drop for RoomDelegate {
490    fn drop(&mut self) {
491        unsafe {
492            CFRelease(self.native_delegate);
493            let _ = Weak::from_raw(self.weak_room);
494        }
495    }
496}
497
498pub struct LocalAudioTrack(*const c_void);
499
500impl LocalAudioTrack {
501    pub fn create() -> Self {
502        Self(unsafe { LKLocalAudioTrackCreateTrack() })
503    }
504}
505
506impl Drop for LocalAudioTrack {
507    fn drop(&mut self) {
508        unsafe { CFRelease(self.0) }
509    }
510}
511
512pub struct LocalVideoTrack(*const c_void);
513
514impl LocalVideoTrack {
515    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
516        Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) })
517    }
518}
519
520impl Drop for LocalVideoTrack {
521    fn drop(&mut self) {
522        unsafe { CFRelease(self.0) }
523    }
524}
525
526pub struct LocalTrackPublication(*const c_void);
527
528impl Drop for LocalTrackPublication {
529    fn drop(&mut self) {
530        unsafe { CFRelease(self.0) }
531    }
532}
533
534#[derive(Debug)]
535pub struct RemoteAudioTrack {
536    _native_track: *const c_void,
537    sid: Sid,
538    publisher_id: String,
539}
540
541impl RemoteAudioTrack {
542    fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
543        unsafe {
544            CFRetain(native_track);
545        }
546        Self {
547            _native_track: native_track,
548            sid,
549            publisher_id,
550        }
551    }
552
553    pub fn sid(&self) -> &str {
554        &self.sid
555    }
556
557    pub fn publisher_id(&self) -> &str {
558        &self.publisher_id
559    }
560}
561
562#[derive(Debug)]
563pub struct RemoteVideoTrack {
564    native_track: *const c_void,
565    sid: Sid,
566    publisher_id: String,
567}
568
569impl RemoteVideoTrack {
570    fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
571        unsafe {
572            CFRetain(native_track);
573        }
574        Self {
575            native_track,
576            sid,
577            publisher_id,
578        }
579    }
580
581    pub fn sid(&self) -> &str {
582        &self.sid
583    }
584
585    pub fn publisher_id(&self) -> &str {
586        &self.publisher_id
587    }
588
589    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
590        extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool {
591            unsafe {
592                let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
593                let buffer = CVImageBuffer::wrap_under_get_rule(frame);
594                let result = tx.try_broadcast(Frame(buffer));
595                let _ = Box::into_raw(tx);
596                match result {
597                    Ok(_) => true,
598                    Err(async_broadcast::TrySendError::Closed(_))
599                    | Err(async_broadcast::TrySendError::Inactive(_)) => {
600                        log::warn!("no active receiver for frame");
601                        false
602                    }
603                    Err(async_broadcast::TrySendError::Full(_)) => {
604                        log::warn!("skipping frame as receiver is not keeping up");
605                        true
606                    }
607                }
608            }
609        }
610
611        extern "C" fn on_drop(callback_data: *mut c_void) {
612            unsafe {
613                let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
614            }
615        }
616
617        let (tx, rx) = async_broadcast::broadcast(64);
618        unsafe {
619            let renderer = LKVideoRendererCreate(
620                Box::into_raw(Box::new(tx)) as *mut c_void,
621                on_frame,
622                on_drop,
623            );
624            LKVideoTrackAddRenderer(self.native_track, renderer);
625            rx
626        }
627    }
628}
629
630impl Drop for RemoteVideoTrack {
631    fn drop(&mut self) {
632        unsafe { CFRelease(self.native_track) }
633    }
634}
635
636pub enum RemoteVideoTrackUpdate {
637    Subscribed(Arc<RemoteVideoTrack>),
638    Unsubscribed { publisher_id: Sid, track_id: Sid },
639}
640
641pub enum RemoteAudioTrackUpdate {
642    Subscribed(Arc<RemoteAudioTrack>),
643    Unsubscribed { publisher_id: Sid, track_id: Sid },
644}
645
646pub struct MacOSDisplay(*const c_void);
647
648impl MacOSDisplay {
649    fn new(ptr: *const c_void) -> Self {
650        unsafe {
651            CFRetain(ptr);
652        }
653        Self(ptr)
654    }
655}
656
657impl Drop for MacOSDisplay {
658    fn drop(&mut self) {
659        unsafe { CFRelease(self.0) }
660    }
661}
662
663#[derive(Clone)]
664pub struct Frame(CVImageBuffer);
665
666impl Frame {
667    pub fn width(&self) -> usize {
668        self.0.width()
669    }
670
671    pub fn height(&self) -> usize {
672        self.0.height()
673    }
674
675    pub fn image(&self) -> CVImageBuffer {
676        self.0.clone()
677    }
678}