prod.rs

  1use anyhow::{anyhow, Context, Result};
  2use core_foundation::{
  3    array::{CFArray, CFArrayRef},
  4    base::{CFRelease, CFRetain, TCFType},
  5    string::{CFString, CFStringRef},
  6};
  7use futures::{
  8    channel::{mpsc, oneshot},
  9    Future,
 10};
 11pub use media::core_video::CVImageBuffer;
 12use media::core_video::CVImageBufferRef;
 13use parking_lot::Mutex;
 14use postage::watch;
 15use std::{
 16    ffi::c_void,
 17    sync::{Arc, Weak},
 18};
 19
 20extern "C" {
 21    fn LKRoomDelegateCreate(
 22        callback_data: *mut c_void,
 23        on_did_disconnect: extern "C" fn(callback_data: *mut c_void),
 24        on_did_subscribe_to_remote_video_track: extern "C" fn(
 25            callback_data: *mut c_void,
 26            publisher_id: CFStringRef,
 27            track_id: CFStringRef,
 28            remote_track: *const c_void,
 29        ),
 30        on_did_unsubscribe_from_remote_video_track: extern "C" fn(
 31            callback_data: *mut c_void,
 32            publisher_id: CFStringRef,
 33            track_id: CFStringRef,
 34        ),
 35    ) -> *const c_void;
 36
 37    fn LKRoomCreate(delegate: *const c_void) -> *const c_void;
 38    fn LKRoomConnect(
 39        room: *const c_void,
 40        url: CFStringRef,
 41        token: CFStringRef,
 42        callback: extern "C" fn(*mut c_void, CFStringRef),
 43        callback_data: *mut c_void,
 44    );
 45    fn LKRoomDisconnect(room: *const c_void);
 46    fn LKRoomPublishVideoTrack(
 47        room: *const c_void,
 48        track: *const c_void,
 49        callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
 50        callback_data: *mut c_void,
 51    );
 52    fn LKRoomUnpublishTrack(room: *const c_void, publication: *const c_void);
 53    fn LKRoomVideoTracksForRemoteParticipant(
 54        room: *const c_void,
 55        participant_id: CFStringRef,
 56    ) -> CFArrayRef;
 57
 58    fn LKVideoRendererCreate(
 59        callback_data: *mut c_void,
 60        on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool,
 61        on_drop: extern "C" fn(callback_data: *mut c_void),
 62    ) -> *const c_void;
 63
 64    fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
 65    fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef;
 66
 67    fn LKDisplaySources(
 68        callback_data: *mut c_void,
 69        callback: extern "C" fn(
 70            callback_data: *mut c_void,
 71            sources: CFArrayRef,
 72            error: CFStringRef,
 73        ),
 74    );
 75    fn LKCreateScreenShareTrackForDisplay(display: *const c_void) -> *const c_void;
 76}
 77
 78pub type Sid = String;
 79
 80#[derive(Clone, Eq, PartialEq)]
 81pub enum ConnectionState {
 82    Disconnected,
 83    Connected { url: String, token: String },
 84}
 85
 86pub struct Room {
 87    native_room: *const c_void,
 88    connection: Mutex<(
 89        watch::Sender<ConnectionState>,
 90        watch::Receiver<ConnectionState>,
 91    )>,
 92    remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
 93    _delegate: RoomDelegate,
 94}
 95
 96impl Room {
 97    pub fn new() -> Arc<Self> {
 98        Arc::new_cyclic(|weak_room| {
 99            let delegate = RoomDelegate::new(weak_room.clone());
100            Self {
101                native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
102                connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)),
103                remote_video_track_subscribers: Default::default(),
104                _delegate: delegate,
105            }
106        })
107    }
108
109    pub fn status(&self) -> watch::Receiver<ConnectionState> {
110        self.connection.lock().1.clone()
111    }
112
113    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
114        let url = CFString::new(url);
115        let token = CFString::new(token);
116        let (did_connect, tx, rx) = Self::build_done_callback();
117        unsafe {
118            LKRoomConnect(
119                self.native_room,
120                url.as_concrete_TypeRef(),
121                token.as_concrete_TypeRef(),
122                did_connect,
123                tx,
124            )
125        }
126
127        let this = self.clone();
128        let url = url.to_string();
129        let token = token.to_string();
130        async move {
131            match rx.await.unwrap().context("error connecting to room") {
132                Ok(()) => {
133                    *this.connection.lock().0.borrow_mut() =
134                        ConnectionState::Connected { url, token };
135                    Ok(())
136                }
137                Err(err) => Err(err),
138            }
139        }
140    }
141
142    fn did_disconnect(&self) {
143        *self.connection.lock().0.borrow_mut() = ConnectionState::Disconnected;
144    }
145
146    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
147        extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
148            unsafe {
149                let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
150
151                if sources.is_null() {
152                    let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
153                } else {
154                    let sources = CFArray::wrap_under_get_rule(sources)
155                        .into_iter()
156                        .map(|source| MacOSDisplay::new(*source))
157                        .collect();
158
159                    let _ = tx.send(Ok(sources));
160                }
161            }
162        }
163
164        let (tx, rx) = oneshot::channel();
165
166        unsafe {
167            LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
168        }
169
170        async move { rx.await.unwrap() }
171    }
172
173    pub fn publish_video_track(
174        self: &Arc<Self>,
175        track: &LocalVideoTrack,
176    ) -> impl Future<Output = Result<LocalTrackPublication>> {
177        let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
178        extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
179            let tx =
180                unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
181            if error.is_null() {
182                let _ = tx.send(Ok(LocalTrackPublication(publication)));
183            } else {
184                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
185                let _ = tx.send(Err(anyhow!(error)));
186            }
187        }
188        unsafe {
189            LKRoomPublishVideoTrack(
190                self.native_room,
191                track.0,
192                callback,
193                Box::into_raw(Box::new(tx)) as *mut c_void,
194            );
195        }
196        async { rx.await.unwrap().context("error publishing video track") }
197    }
198
199    pub fn unpublish_track(&self, publication: LocalTrackPublication) {
200        unsafe {
201            LKRoomUnpublishTrack(self.native_room, publication.0);
202        }
203    }
204
205    pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
206        unsafe {
207            let tracks = LKRoomVideoTracksForRemoteParticipant(
208                self.native_room,
209                CFString::new(participant_id).as_concrete_TypeRef(),
210            );
211
212            if tracks.is_null() {
213                Vec::new()
214            } else {
215                let tracks = CFArray::wrap_under_get_rule(tracks);
216                tracks
217                    .into_iter()
218                    .map(|native_track| {
219                        let native_track = *native_track;
220                        let id =
221                            CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
222                                .to_string();
223                        Arc::new(RemoteVideoTrack::new(
224                            native_track,
225                            id,
226                            participant_id.into(),
227                        ))
228                    })
229                    .collect()
230            }
231        }
232    }
233
234    pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
235        let (tx, rx) = mpsc::unbounded();
236        self.remote_video_track_subscribers.lock().push(tx);
237        rx
238    }
239
240    fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
241        let track = Arc::new(track);
242        self.remote_video_track_subscribers.lock().retain(|tx| {
243            tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
244                .is_ok()
245        });
246    }
247
248    fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
249        self.remote_video_track_subscribers.lock().retain(|tx| {
250            tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
251                publisher_id: publisher_id.clone(),
252                track_id: track_id.clone(),
253            })
254            .is_ok()
255        });
256    }
257
258    fn build_done_callback() -> (
259        extern "C" fn(*mut c_void, CFStringRef),
260        *mut c_void,
261        oneshot::Receiver<Result<()>>,
262    ) {
263        let (tx, rx) = oneshot::channel();
264        extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
265            let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
266            if error.is_null() {
267                let _ = tx.send(Ok(()));
268            } else {
269                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
270                let _ = tx.send(Err(anyhow!(error)));
271            }
272        }
273        (
274            done_callback,
275            Box::into_raw(Box::new(tx)) as *mut c_void,
276            rx,
277        )
278    }
279}
280
281impl Drop for Room {
282    fn drop(&mut self) {
283        unsafe {
284            LKRoomDisconnect(self.native_room);
285            CFRelease(self.native_room);
286        }
287    }
288}
289
290struct RoomDelegate {
291    native_delegate: *const c_void,
292    weak_room: *const Room,
293}
294
295impl RoomDelegate {
296    fn new(weak_room: Weak<Room>) -> Self {
297        let weak_room = Weak::into_raw(weak_room);
298        let native_delegate = unsafe {
299            LKRoomDelegateCreate(
300                weak_room as *mut c_void,
301                Self::on_did_disconnect,
302                Self::on_did_subscribe_to_remote_video_track,
303                Self::on_did_unsubscribe_from_remote_video_track,
304            )
305        };
306        Self {
307            native_delegate,
308            weak_room,
309        }
310    }
311
312    extern "C" fn on_did_disconnect(room: *mut c_void) {
313        let room = unsafe { Weak::from_raw(room as *mut Room) };
314        if let Some(room) = room.upgrade() {
315            room.did_disconnect();
316        }
317        let _ = Weak::into_raw(room);
318    }
319
320    extern "C" fn on_did_subscribe_to_remote_video_track(
321        room: *mut c_void,
322        publisher_id: CFStringRef,
323        track_id: CFStringRef,
324        track: *const c_void,
325    ) {
326        let room = unsafe { Weak::from_raw(room as *mut Room) };
327        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
328        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
329        let track = RemoteVideoTrack::new(track, track_id, publisher_id);
330        if let Some(room) = room.upgrade() {
331            room.did_subscribe_to_remote_video_track(track);
332        }
333        let _ = Weak::into_raw(room);
334    }
335
336    extern "C" fn on_did_unsubscribe_from_remote_video_track(
337        room: *mut c_void,
338        publisher_id: CFStringRef,
339        track_id: CFStringRef,
340    ) {
341        let room = unsafe { Weak::from_raw(room as *mut Room) };
342        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
343        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
344        if let Some(room) = room.upgrade() {
345            room.did_unsubscribe_from_remote_video_track(publisher_id, track_id);
346        }
347        let _ = Weak::into_raw(room);
348    }
349}
350
351impl Drop for RoomDelegate {
352    fn drop(&mut self) {
353        unsafe {
354            CFRelease(self.native_delegate);
355            let _ = Weak::from_raw(self.weak_room);
356        }
357    }
358}
359
360pub struct LocalVideoTrack(*const c_void);
361
362impl LocalVideoTrack {
363    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
364        Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) })
365    }
366}
367
368impl Drop for LocalVideoTrack {
369    fn drop(&mut self) {
370        unsafe { CFRelease(self.0) }
371    }
372}
373
374pub struct LocalTrackPublication(*const c_void);
375
376impl Drop for LocalTrackPublication {
377    fn drop(&mut self) {
378        unsafe { CFRelease(self.0) }
379    }
380}
381
382#[derive(Debug)]
383pub struct RemoteVideoTrack {
384    native_track: *const c_void,
385    sid: Sid,
386    publisher_id: String,
387}
388
389impl RemoteVideoTrack {
390    fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
391        unsafe {
392            CFRetain(native_track);
393        }
394        Self {
395            native_track,
396            sid,
397            publisher_id,
398        }
399    }
400
401    pub fn sid(&self) -> &str {
402        &self.sid
403    }
404
405    pub fn publisher_id(&self) -> &str {
406        &self.publisher_id
407    }
408
409    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
410        extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool {
411            unsafe {
412                let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
413                let buffer = CVImageBuffer::wrap_under_get_rule(frame);
414                let result = tx.try_broadcast(Frame(buffer));
415                let _ = Box::into_raw(tx);
416                match result {
417                    Ok(_) => true,
418                    Err(async_broadcast::TrySendError::Closed(_))
419                    | Err(async_broadcast::TrySendError::Inactive(_)) => {
420                        log::warn!("no active receiver for frame");
421                        false
422                    }
423                    Err(async_broadcast::TrySendError::Full(_)) => {
424                        log::warn!("skipping frame as receiver is not keeping up");
425                        true
426                    }
427                }
428            }
429        }
430
431        extern "C" fn on_drop(callback_data: *mut c_void) {
432            unsafe {
433                let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
434            }
435        }
436
437        let (tx, rx) = async_broadcast::broadcast(64);
438        unsafe {
439            let renderer = LKVideoRendererCreate(
440                Box::into_raw(Box::new(tx)) as *mut c_void,
441                on_frame,
442                on_drop,
443            );
444            LKVideoTrackAddRenderer(self.native_track, renderer);
445            rx
446        }
447    }
448}
449
450impl Drop for RemoteVideoTrack {
451    fn drop(&mut self) {
452        unsafe { CFRelease(self.native_track) }
453    }
454}
455
456pub enum RemoteVideoTrackUpdate {
457    Subscribed(Arc<RemoteVideoTrack>),
458    Unsubscribed { publisher_id: Sid, track_id: Sid },
459}
460
461pub struct MacOSDisplay(*const c_void);
462
463impl MacOSDisplay {
464    fn new(ptr: *const c_void) -> Self {
465        unsafe {
466            CFRetain(ptr);
467        }
468        Self(ptr)
469    }
470}
471
472impl Drop for MacOSDisplay {
473    fn drop(&mut self) {
474        unsafe { CFRelease(self.0) }
475    }
476}
477
478#[derive(Clone)]
479pub struct Frame(CVImageBuffer);
480
481impl Frame {
482    pub fn width(&self) -> usize {
483        self.0.width()
484    }
485
486    pub fn height(&self) -> usize {
487        self.0.height()
488    }
489
490    pub fn image(&self) -> CVImageBuffer {
491        self.0.clone()
492    }
493}