prod.rs

  1use anyhow::{anyhow, Context, Result};
  2use core_foundation::{
  3    array::{CFArray, CFArrayRef},
  4    base::{CFRelease, CFRetain, TCFType},
  5    string::{CFString, CFStringRef},
  6};
  7use futures::{
  8    channel::{mpsc, oneshot},
  9    Future,
 10};
 11pub use media::core_video::CVImageBuffer;
 12use media::core_video::CVImageBufferRef;
 13use parking_lot::Mutex;
 14use std::{
 15    ffi::c_void,
 16    sync::{Arc, Weak},
 17};
 18
 19extern "C" {
 20    fn LKRoomDelegateCreate(
 21        callback_data: *mut c_void,
 22        on_did_subscribe_to_remote_video_track: extern "C" fn(
 23            callback_data: *mut c_void,
 24            publisher_id: CFStringRef,
 25            track_id: CFStringRef,
 26            remote_track: *const c_void,
 27        ),
 28        on_did_unsubscribe_from_remote_video_track: extern "C" fn(
 29            callback_data: *mut c_void,
 30            publisher_id: CFStringRef,
 31            track_id: CFStringRef,
 32        ),
 33    ) -> *const c_void;
 34
 35    fn LKRoomCreate(delegate: *const c_void) -> *const c_void;
 36    fn LKRoomConnect(
 37        room: *const c_void,
 38        url: CFStringRef,
 39        token: CFStringRef,
 40        callback: extern "C" fn(*mut c_void, CFStringRef),
 41        callback_data: *mut c_void,
 42    );
 43    fn LKRoomDisconnect(room: *const c_void);
 44    fn LKRoomPublishVideoTrack(
 45        room: *const c_void,
 46        track: *const c_void,
 47        callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
 48        callback_data: *mut c_void,
 49    );
 50    fn LKRoomUnpublishTrack(room: *const c_void, publication: *const c_void);
 51    fn LKRoomVideoTracksForRemoteParticipant(
 52        room: *const c_void,
 53        participant_id: CFStringRef,
 54    ) -> CFArrayRef;
 55
 56    fn LKVideoRendererCreate(
 57        callback_data: *mut c_void,
 58        on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef),
 59        on_drop: extern "C" fn(callback_data: *mut c_void),
 60    ) -> *const c_void;
 61
 62    fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
 63    fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef;
 64
 65    fn LKDisplaySources(
 66        callback_data: *mut c_void,
 67        callback: extern "C" fn(
 68            callback_data: *mut c_void,
 69            sources: CFArrayRef,
 70            error: CFStringRef,
 71        ),
 72    );
 73    fn LKCreateScreenShareTrackForDisplay(display: *const c_void) -> *const c_void;
 74}
 75
 76pub type Sid = String;
 77
 78pub struct Room {
 79    native_room: *const c_void,
 80    remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
 81    _delegate: RoomDelegate,
 82}
 83
 84impl Room {
 85    pub fn new() -> Arc<Self> {
 86        Arc::new_cyclic(|weak_room| {
 87            let delegate = RoomDelegate::new(weak_room.clone());
 88            Self {
 89                native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
 90                remote_video_track_subscribers: Default::default(),
 91                _delegate: delegate,
 92            }
 93        })
 94    }
 95
 96    pub fn connect(&self, url: &str, token: &str) -> impl Future<Output = Result<()>> {
 97        let url = CFString::new(url);
 98        let token = CFString::new(token);
 99        let (did_connect, tx, rx) = Self::build_done_callback();
100        unsafe {
101            LKRoomConnect(
102                self.native_room,
103                url.as_concrete_TypeRef(),
104                token.as_concrete_TypeRef(),
105                did_connect,
106                tx,
107            )
108        }
109
110        async { rx.await.unwrap().context("error connecting to room") }
111    }
112
113    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
114        extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
115            unsafe {
116                let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
117
118                if sources.is_null() {
119                    let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
120                } else {
121                    let sources = CFArray::wrap_under_get_rule(sources)
122                        .into_iter()
123                        .map(|source| MacOSDisplay::new(*source))
124                        .collect();
125
126                    let _ = tx.send(Ok(sources));
127                }
128            }
129        }
130
131        let (tx, rx) = oneshot::channel();
132
133        unsafe {
134            LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
135        }
136
137        async move { rx.await.unwrap() }
138    }
139
140    pub fn publish_video_track(
141        self: &Arc<Self>,
142        track: &LocalVideoTrack,
143    ) -> impl Future<Output = Result<LocalTrackPublication>> {
144        let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
145        extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
146            let tx =
147                unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
148            if error.is_null() {
149                let _ = tx.send(Ok(LocalTrackPublication(publication)));
150            } else {
151                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
152                let _ = tx.send(Err(anyhow!(error)));
153            }
154        }
155        unsafe {
156            LKRoomPublishVideoTrack(
157                self.native_room,
158                track.0,
159                callback,
160                Box::into_raw(Box::new(tx)) as *mut c_void,
161            );
162        }
163        async { rx.await.unwrap().context("error publishing video track") }
164    }
165
166    pub fn unpublish_track(&self, publication: LocalTrackPublication) {
167        unsafe {
168            LKRoomUnpublishTrack(self.native_room, publication.0);
169        }
170    }
171
172    pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
173        unsafe {
174            let tracks = LKRoomVideoTracksForRemoteParticipant(
175                self.native_room,
176                CFString::new(participant_id).as_concrete_TypeRef(),
177            );
178
179            if tracks.is_null() {
180                Vec::new()
181            } else {
182                let tracks = CFArray::wrap_under_get_rule(tracks);
183                tracks
184                    .into_iter()
185                    .map(|native_track| {
186                        let native_track = *native_track;
187                        let id =
188                            CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
189                                .to_string();
190                        Arc::new(RemoteVideoTrack::new(
191                            native_track,
192                            id,
193                            participant_id.into(),
194                        ))
195                    })
196                    .collect()
197            }
198        }
199    }
200
201    pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
202        let (tx, rx) = mpsc::unbounded();
203        self.remote_video_track_subscribers.lock().push(tx);
204        rx
205    }
206
207    fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
208        let track = Arc::new(track);
209        self.remote_video_track_subscribers.lock().retain(|tx| {
210            tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
211                .is_ok()
212        });
213    }
214
215    fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
216        self.remote_video_track_subscribers.lock().retain(|tx| {
217            tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
218                publisher_id: publisher_id.clone(),
219                track_id: track_id.clone(),
220            })
221            .is_ok()
222        });
223    }
224
225    fn build_done_callback() -> (
226        extern "C" fn(*mut c_void, CFStringRef),
227        *mut c_void,
228        oneshot::Receiver<Result<()>>,
229    ) {
230        let (tx, rx) = oneshot::channel();
231        extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
232            let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
233            if error.is_null() {
234                let _ = tx.send(Ok(()));
235            } else {
236                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
237                let _ = tx.send(Err(anyhow!(error)));
238            }
239        }
240        (
241            done_callback,
242            Box::into_raw(Box::new(tx)) as *mut c_void,
243            rx,
244        )
245    }
246}
247
248impl Drop for Room {
249    fn drop(&mut self) {
250        unsafe {
251            LKRoomDisconnect(self.native_room);
252            CFRelease(self.native_room);
253        }
254    }
255}
256
257struct RoomDelegate {
258    native_delegate: *const c_void,
259    weak_room: *const Room,
260}
261
262impl RoomDelegate {
263    fn new(weak_room: Weak<Room>) -> Self {
264        let weak_room = Weak::into_raw(weak_room);
265        let native_delegate = unsafe {
266            LKRoomDelegateCreate(
267                weak_room as *mut c_void,
268                Self::on_did_subscribe_to_remote_video_track,
269                Self::on_did_unsubscribe_from_remote_video_track,
270            )
271        };
272        Self {
273            native_delegate,
274            weak_room,
275        }
276    }
277
278    extern "C" fn on_did_subscribe_to_remote_video_track(
279        room: *mut c_void,
280        publisher_id: CFStringRef,
281        track_id: CFStringRef,
282        track: *const c_void,
283    ) {
284        let room = unsafe { Weak::from_raw(room as *mut Room) };
285        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
286        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
287        let track = RemoteVideoTrack::new(track, track_id, publisher_id);
288        if let Some(room) = room.upgrade() {
289            room.did_subscribe_to_remote_video_track(track);
290        }
291        let _ = Weak::into_raw(room);
292    }
293
294    extern "C" fn on_did_unsubscribe_from_remote_video_track(
295        room: *mut c_void,
296        publisher_id: CFStringRef,
297        track_id: CFStringRef,
298    ) {
299        let room = unsafe { Weak::from_raw(room as *mut Room) };
300        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
301        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
302        if let Some(room) = room.upgrade() {
303            room.did_unsubscribe_from_remote_video_track(publisher_id, track_id);
304        }
305        let _ = Weak::into_raw(room);
306    }
307}
308
309impl Drop for RoomDelegate {
310    fn drop(&mut self) {
311        unsafe {
312            CFRelease(self.native_delegate);
313            let _ = Weak::from_raw(self.weak_room);
314        }
315    }
316}
317
318pub struct LocalVideoTrack(*const c_void);
319
320impl LocalVideoTrack {
321    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
322        Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) })
323    }
324}
325
326impl Drop for LocalVideoTrack {
327    fn drop(&mut self) {
328        unsafe { CFRelease(self.0) }
329    }
330}
331
332pub struct LocalTrackPublication(*const c_void);
333
334impl Drop for LocalTrackPublication {
335    fn drop(&mut self) {
336        unsafe { CFRelease(self.0) }
337    }
338}
339
340#[derive(Debug)]
341pub struct RemoteVideoTrack {
342    native_track: *const c_void,
343    sid: Sid,
344    publisher_id: String,
345}
346
347impl RemoteVideoTrack {
348    fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
349        unsafe {
350            CFRetain(native_track);
351        }
352        Self {
353            native_track,
354            sid,
355            publisher_id,
356        }
357    }
358
359    pub fn sid(&self) -> &str {
360        &self.sid
361    }
362
363    pub fn publisher_id(&self) -> &str {
364        &self.publisher_id
365    }
366
367    pub fn add_renderer<F>(&self, callback: F)
368    where
369        F: 'static + Send + Sync + FnMut(Frame),
370    {
371        extern "C" fn on_frame<F>(callback_data: *mut c_void, frame: CVImageBufferRef)
372        where
373            F: FnMut(Frame),
374        {
375            unsafe {
376                let buffer = CVImageBuffer::wrap_under_get_rule(frame);
377                let callback = &mut *(callback_data as *mut F);
378                callback(Frame(buffer));
379            }
380        }
381
382        extern "C" fn on_drop<F>(callback_data: *mut c_void) {
383            unsafe {
384                let _ = Box::from_raw(callback_data as *mut F);
385            }
386        }
387
388        let callback_data = Box::into_raw(Box::new(callback));
389        unsafe {
390            let renderer =
391                LKVideoRendererCreate(callback_data as *mut c_void, on_frame::<F>, on_drop::<F>);
392            LKVideoTrackAddRenderer(self.native_track, renderer);
393        }
394    }
395}
396
397impl Drop for RemoteVideoTrack {
398    fn drop(&mut self) {
399        unsafe { CFRelease(self.native_track) }
400    }
401}
402
403pub enum RemoteVideoTrackUpdate {
404    Subscribed(Arc<RemoteVideoTrack>),
405    Unsubscribed { publisher_id: Sid, track_id: Sid },
406}
407
408pub struct MacOSDisplay(*const c_void);
409
410impl MacOSDisplay {
411    fn new(ptr: *const c_void) -> Self {
412        unsafe {
413            CFRetain(ptr);
414        }
415        Self(ptr)
416    }
417}
418
419impl Drop for MacOSDisplay {
420    fn drop(&mut self) {
421        unsafe { CFRelease(self.0) }
422    }
423}
424
425pub struct Frame(CVImageBuffer);
426
427impl Frame {
428    pub fn width(&self) -> usize {
429        self.0.width()
430    }
431
432    pub fn height(&self) -> usize {
433        self.0.height()
434    }
435
436    pub fn image(&self) -> CVImageBuffer {
437        self.0.clone()
438    }
439}