prod.rs

  1use anyhow::{anyhow, Context, Result};
  2use core_foundation::{
  3    array::{CFArray, CFArrayRef},
  4    base::{CFRelease, CFRetain, TCFType},
  5    string::{CFString, CFStringRef},
  6};
  7use futures::{
  8    channel::{mpsc, oneshot},
  9    Future,
 10};
 11pub use media::core_video::CVImageBuffer;
 12use media::core_video::CVImageBufferRef;
 13use parking_lot::Mutex;
 14use std::{
 15    ffi::c_void,
 16    sync::{Arc, Weak},
 17};
 18
 19extern "C" {
 20    fn LKRoomDelegateCreate(
 21        callback_data: *mut c_void,
 22        on_did_subscribe_to_remote_video_track: extern "C" fn(
 23            callback_data: *mut c_void,
 24            publisher_id: CFStringRef,
 25            track_id: CFStringRef,
 26            remote_track: *const c_void,
 27        ),
 28        on_did_unsubscribe_from_remote_video_track: extern "C" fn(
 29            callback_data: *mut c_void,
 30            publisher_id: CFStringRef,
 31            track_id: CFStringRef,
 32        ),
 33    ) -> *const c_void;
 34
 35    fn LKRoomCreate(delegate: *const c_void) -> *const c_void;
 36    fn LKRoomConnect(
 37        room: *const c_void,
 38        url: CFStringRef,
 39        token: CFStringRef,
 40        callback: extern "C" fn(*mut c_void, CFStringRef),
 41        callback_data: *mut c_void,
 42    );
 43    fn LKRoomDisconnect(room: *const c_void);
 44    fn LKRoomPublishVideoTrack(
 45        room: *const c_void,
 46        track: *const c_void,
 47        callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
 48        callback_data: *mut c_void,
 49    );
 50    fn LKRoomUnpublishTrack(room: *const c_void, publication: *const c_void);
 51    fn LKRoomVideoTracksForRemoteParticipant(
 52        room: *const c_void,
 53        participant_id: CFStringRef,
 54    ) -> CFArrayRef;
 55
 56    fn LKVideoRendererCreate(
 57        callback_data: *mut c_void,
 58        on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool,
 59        on_drop: extern "C" fn(callback_data: *mut c_void),
 60    ) -> *const c_void;
 61
 62    fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
 63    fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef;
 64
 65    fn LKDisplaySources(
 66        callback_data: *mut c_void,
 67        callback: extern "C" fn(
 68            callback_data: *mut c_void,
 69            sources: CFArrayRef,
 70            error: CFStringRef,
 71        ),
 72    );
 73    fn LKCreateScreenShareTrackForDisplay(display: *const c_void) -> *const c_void;
 74}
 75
 76pub type Sid = String;
 77
 78pub struct Room {
 79    native_room: *const c_void,
 80    remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
 81    _delegate: RoomDelegate,
 82}
 83
 84impl Room {
 85    pub fn new() -> Arc<Self> {
 86        Arc::new_cyclic(|weak_room| {
 87            let delegate = RoomDelegate::new(weak_room.clone());
 88            Self {
 89                native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
 90                remote_video_track_subscribers: Default::default(),
 91                _delegate: delegate,
 92            }
 93        })
 94    }
 95
 96    pub fn connect(&self, url: &str, token: &str) -> impl Future<Output = Result<()>> {
 97        let url = CFString::new(url);
 98        let token = CFString::new(token);
 99        let (did_connect, tx, rx) = Self::build_done_callback();
100        unsafe {
101            LKRoomConnect(
102                self.native_room,
103                url.as_concrete_TypeRef(),
104                token.as_concrete_TypeRef(),
105                did_connect,
106                tx,
107            )
108        }
109
110        async { rx.await.unwrap().context("error connecting to room") }
111    }
112
113    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
114        extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
115            unsafe {
116                let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
117
118                if sources.is_null() {
119                    let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
120                } else {
121                    let sources = CFArray::wrap_under_get_rule(sources)
122                        .into_iter()
123                        .map(|source| MacOSDisplay::new(*source))
124                        .collect();
125
126                    let _ = tx.send(Ok(sources));
127                }
128            }
129        }
130
131        let (tx, rx) = oneshot::channel();
132
133        unsafe {
134            LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
135        }
136
137        async move { rx.await.unwrap() }
138    }
139
140    pub fn publish_video_track(
141        self: &Arc<Self>,
142        track: &LocalVideoTrack,
143    ) -> impl Future<Output = Result<LocalTrackPublication>> {
144        let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
145        extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
146            let tx =
147                unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
148            if error.is_null() {
149                let _ = tx.send(Ok(LocalTrackPublication(publication)));
150            } else {
151                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
152                let _ = tx.send(Err(anyhow!(error)));
153            }
154        }
155        unsafe {
156            LKRoomPublishVideoTrack(
157                self.native_room,
158                track.0,
159                callback,
160                Box::into_raw(Box::new(tx)) as *mut c_void,
161            );
162        }
163        async { rx.await.unwrap().context("error publishing video track") }
164    }
165
166    pub fn unpublish_track(&self, publication: LocalTrackPublication) {
167        unsafe {
168            LKRoomUnpublishTrack(self.native_room, publication.0);
169        }
170    }
171
172    pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
173        unsafe {
174            let tracks = LKRoomVideoTracksForRemoteParticipant(
175                self.native_room,
176                CFString::new(participant_id).as_concrete_TypeRef(),
177            );
178
179            if tracks.is_null() {
180                Vec::new()
181            } else {
182                let tracks = CFArray::wrap_under_get_rule(tracks);
183                tracks
184                    .into_iter()
185                    .map(|native_track| {
186                        let native_track = *native_track;
187                        let id =
188                            CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
189                                .to_string();
190                        Arc::new(RemoteVideoTrack::new(
191                            native_track,
192                            id,
193                            participant_id.into(),
194                        ))
195                    })
196                    .collect()
197            }
198        }
199    }
200
201    pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
202        let (tx, rx) = mpsc::unbounded();
203        self.remote_video_track_subscribers.lock().push(tx);
204        rx
205    }
206
207    fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
208        let track = Arc::new(track);
209        self.remote_video_track_subscribers.lock().retain(|tx| {
210            tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
211                .is_ok()
212        });
213    }
214
215    fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
216        self.remote_video_track_subscribers.lock().retain(|tx| {
217            tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
218                publisher_id: publisher_id.clone(),
219                track_id: track_id.clone(),
220            })
221            .is_ok()
222        });
223    }
224
225    fn build_done_callback() -> (
226        extern "C" fn(*mut c_void, CFStringRef),
227        *mut c_void,
228        oneshot::Receiver<Result<()>>,
229    ) {
230        let (tx, rx) = oneshot::channel();
231        extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
232            let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
233            if error.is_null() {
234                let _ = tx.send(Ok(()));
235            } else {
236                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
237                let _ = tx.send(Err(anyhow!(error)));
238            }
239        }
240        (
241            done_callback,
242            Box::into_raw(Box::new(tx)) as *mut c_void,
243            rx,
244        )
245    }
246}
247
248impl Drop for Room {
249    fn drop(&mut self) {
250        unsafe {
251            LKRoomDisconnect(self.native_room);
252            CFRelease(self.native_room);
253        }
254    }
255}
256
257struct RoomDelegate {
258    native_delegate: *const c_void,
259    weak_room: *const Room,
260}
261
262impl RoomDelegate {
263    fn new(weak_room: Weak<Room>) -> Self {
264        let weak_room = Weak::into_raw(weak_room);
265        let native_delegate = unsafe {
266            LKRoomDelegateCreate(
267                weak_room as *mut c_void,
268                Self::on_did_subscribe_to_remote_video_track,
269                Self::on_did_unsubscribe_from_remote_video_track,
270            )
271        };
272        Self {
273            native_delegate,
274            weak_room,
275        }
276    }
277
278    extern "C" fn on_did_subscribe_to_remote_video_track(
279        room: *mut c_void,
280        publisher_id: CFStringRef,
281        track_id: CFStringRef,
282        track: *const c_void,
283    ) {
284        let room = unsafe { Weak::from_raw(room as *mut Room) };
285        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
286        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
287        let track = RemoteVideoTrack::new(track, track_id, publisher_id);
288        if let Some(room) = room.upgrade() {
289            room.did_subscribe_to_remote_video_track(track);
290        }
291        let _ = Weak::into_raw(room);
292    }
293
294    extern "C" fn on_did_unsubscribe_from_remote_video_track(
295        room: *mut c_void,
296        publisher_id: CFStringRef,
297        track_id: CFStringRef,
298    ) {
299        let room = unsafe { Weak::from_raw(room as *mut Room) };
300        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
301        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
302        if let Some(room) = room.upgrade() {
303            room.did_unsubscribe_from_remote_video_track(publisher_id, track_id);
304        }
305        let _ = Weak::into_raw(room);
306    }
307}
308
309impl Drop for RoomDelegate {
310    fn drop(&mut self) {
311        unsafe {
312            CFRelease(self.native_delegate);
313            let _ = Weak::from_raw(self.weak_room);
314        }
315    }
316}
317
318pub struct LocalVideoTrack(*const c_void);
319
320impl LocalVideoTrack {
321    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
322        Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) })
323    }
324}
325
326impl Drop for LocalVideoTrack {
327    fn drop(&mut self) {
328        unsafe { CFRelease(self.0) }
329    }
330}
331
332pub struct LocalTrackPublication(*const c_void);
333
334impl Drop for LocalTrackPublication {
335    fn drop(&mut self) {
336        unsafe { CFRelease(self.0) }
337    }
338}
339
340#[derive(Debug)]
341pub struct RemoteVideoTrack {
342    native_track: *const c_void,
343    sid: Sid,
344    publisher_id: String,
345}
346
347impl RemoteVideoTrack {
348    fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
349        unsafe {
350            CFRetain(native_track);
351        }
352        Self {
353            native_track,
354            sid,
355            publisher_id,
356        }
357    }
358
359    pub fn sid(&self) -> &str {
360        &self.sid
361    }
362
363    pub fn publisher_id(&self) -> &str {
364        &self.publisher_id
365    }
366
367    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
368        extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool {
369            unsafe {
370                let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
371                let buffer = CVImageBuffer::wrap_under_get_rule(frame);
372                let result = tx.try_broadcast(Frame(buffer));
373                let _ = Box::into_raw(tx);
374                match result {
375                    Ok(_) => true,
376                    Err(async_broadcast::TrySendError::Closed(_))
377                    | Err(async_broadcast::TrySendError::Inactive(_)) => {
378                        log::warn!("no active receiver for frame");
379                        false
380                    }
381                    Err(async_broadcast::TrySendError::Full(_)) => {
382                        log::warn!("skipping frame as receiver is not keeping up");
383                        true
384                    }
385                }
386            }
387        }
388
389        extern "C" fn on_drop(callback_data: *mut c_void) {
390            unsafe {
391                let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
392            }
393        }
394
395        let (tx, rx) = async_broadcast::broadcast(64);
396        unsafe {
397            let renderer = LKVideoRendererCreate(
398                Box::into_raw(Box::new(tx)) as *mut c_void,
399                on_frame,
400                on_drop,
401            );
402            LKVideoTrackAddRenderer(self.native_track, renderer);
403            rx
404        }
405    }
406}
407
408impl Drop for RemoteVideoTrack {
409    fn drop(&mut self) {
410        unsafe { CFRelease(self.native_track) }
411    }
412}
413
414pub enum RemoteVideoTrackUpdate {
415    Subscribed(Arc<RemoteVideoTrack>),
416    Unsubscribed { publisher_id: Sid, track_id: Sid },
417}
418
419pub struct MacOSDisplay(*const c_void);
420
421impl MacOSDisplay {
422    fn new(ptr: *const c_void) -> Self {
423        unsafe {
424            CFRetain(ptr);
425        }
426        Self(ptr)
427    }
428}
429
430impl Drop for MacOSDisplay {
431    fn drop(&mut self) {
432        unsafe { CFRelease(self.0) }
433    }
434}
435
436#[derive(Clone)]
437pub struct Frame(CVImageBuffer);
438
439impl Frame {
440    pub fn width(&self) -> usize {
441        self.0.width()
442    }
443
444    pub fn height(&self) -> usize {
445        self.0.height()
446    }
447
448    pub fn image(&self) -> CVImageBuffer {
449        self.0.clone()
450    }
451}