prod.rs

  1use anyhow::{anyhow, Context, Result};
  2use core_foundation::{
  3    array::{CFArray, CFArrayRef},
  4    base::{CFRelease, CFRetain, TCFType},
  5    string::{CFString, CFStringRef},
  6};
  7use futures::{
  8    channel::{mpsc, oneshot},
  9    Future,
 10};
 11pub use media::core_video::CVImageBuffer;
 12use media::core_video::CVImageBufferRef;
 13use parking_lot::Mutex;
 14use postage::watch;
 15use std::{
 16    ffi::c_void,
 17    sync::{Arc, Weak},
 18};
 19
 20extern "C" {
 21    fn LKRoomDelegateCreate(
 22        callback_data: *mut c_void,
 23        on_did_disconnect: extern "C" fn(callback_data: *mut c_void),
 24        on_did_subscribe_to_remote_video_track: extern "C" fn(
 25            callback_data: *mut c_void,
 26            publisher_id: CFStringRef,
 27            track_id: CFStringRef,
 28            remote_track: *const c_void,
 29        ),
 30        on_did_unsubscribe_from_remote_video_track: extern "C" fn(
 31            callback_data: *mut c_void,
 32            publisher_id: CFStringRef,
 33            track_id: CFStringRef,
 34        ),
 35    ) -> *const c_void;
 36
 37    fn LKRoomCreate(delegate: *const c_void) -> *const c_void;
 38    fn LKRoomConnect(
 39        room: *const c_void,
 40        url: CFStringRef,
 41        token: CFStringRef,
 42        callback: extern "C" fn(*mut c_void, CFStringRef),
 43        callback_data: *mut c_void,
 44    );
 45    fn LKRoomDisconnect(room: *const c_void);
 46    fn LKRoomPublishVideoTrack(
 47        room: *const c_void,
 48        track: *const c_void,
 49        callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
 50        callback_data: *mut c_void,
 51    );
 52    fn LKRoomUnpublishTrack(room: *const c_void, publication: *const c_void);
 53    fn LKRoomVideoTracksForRemoteParticipant(
 54        room: *const c_void,
 55        participant_id: CFStringRef,
 56    ) -> CFArrayRef;
 57
 58    fn LKVideoRendererCreate(
 59        callback_data: *mut c_void,
 60        on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool,
 61        on_drop: extern "C" fn(callback_data: *mut c_void),
 62    ) -> *const c_void;
 63
 64    fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
 65    fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef;
 66
 67    fn LKDisplaySources(
 68        callback_data: *mut c_void,
 69        callback: extern "C" fn(
 70            callback_data: *mut c_void,
 71            sources: CFArrayRef,
 72            error: CFStringRef,
 73        ),
 74    );
 75    fn LKCreateScreenShareTrackForDisplay(display: *const c_void) -> *const c_void;
 76}
 77
 78pub type Sid = String;
 79
 80#[derive(Clone, Eq, PartialEq)]
 81pub enum ConnectionState {
 82    Disconnected,
 83    Connected { url: String, token: String },
 84}
 85
 86pub struct Room {
 87    native_room: *const c_void,
 88    connection: Mutex<(
 89        watch::Sender<ConnectionState>,
 90        watch::Receiver<ConnectionState>,
 91    )>,
 92    remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
 93    _delegate: RoomDelegate,
 94}
 95
 96impl Room {
 97    pub fn new() -> Arc<Self> {
 98        Arc::new_cyclic(|weak_room| {
 99            let delegate = RoomDelegate::new(weak_room.clone());
100            Self {
101                native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
102                connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)),
103                remote_video_track_subscribers: Default::default(),
104                _delegate: delegate,
105            }
106        })
107    }
108
109    pub fn status(&self) -> watch::Receiver<ConnectionState> {
110        self.connection.lock().1.clone()
111    }
112
113    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
114        let url = CFString::new(url);
115        let token = CFString::new(token);
116        let (did_connect, tx, rx) = Self::build_done_callback();
117        unsafe {
118            LKRoomConnect(
119                self.native_room,
120                url.as_concrete_TypeRef(),
121                token.as_concrete_TypeRef(),
122                did_connect,
123                tx,
124            )
125        }
126
127        let this = self.clone();
128        let url = url.to_string();
129        let token = token.to_string();
130        async move {
131            rx.await.unwrap().context("error connecting to room")?;
132            *this.connection.lock().0.borrow_mut() = ConnectionState::Connected { url, token };
133            Ok(())
134        }
135    }
136
137    fn did_disconnect(&self) {
138        *self.connection.lock().0.borrow_mut() = ConnectionState::Disconnected;
139    }
140
141    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
142        extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
143            unsafe {
144                let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
145
146                if sources.is_null() {
147                    let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
148                } else {
149                    let sources = CFArray::wrap_under_get_rule(sources)
150                        .into_iter()
151                        .map(|source| MacOSDisplay::new(*source))
152                        .collect();
153
154                    let _ = tx.send(Ok(sources));
155                }
156            }
157        }
158
159        let (tx, rx) = oneshot::channel();
160
161        unsafe {
162            LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
163        }
164
165        async move { rx.await.unwrap() }
166    }
167
168    pub fn publish_video_track(
169        self: &Arc<Self>,
170        track: &LocalVideoTrack,
171    ) -> impl Future<Output = Result<LocalTrackPublication>> {
172        let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
173        extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
174            let tx =
175                unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
176            if error.is_null() {
177                let _ = tx.send(Ok(LocalTrackPublication(publication)));
178            } else {
179                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
180                let _ = tx.send(Err(anyhow!(error)));
181            }
182        }
183        unsafe {
184            LKRoomPublishVideoTrack(
185                self.native_room,
186                track.0,
187                callback,
188                Box::into_raw(Box::new(tx)) as *mut c_void,
189            );
190        }
191        async { rx.await.unwrap().context("error publishing video track") }
192    }
193
194    pub fn unpublish_track(&self, publication: LocalTrackPublication) {
195        unsafe {
196            LKRoomUnpublishTrack(self.native_room, publication.0);
197        }
198    }
199
200    pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
201        unsafe {
202            let tracks = LKRoomVideoTracksForRemoteParticipant(
203                self.native_room,
204                CFString::new(participant_id).as_concrete_TypeRef(),
205            );
206
207            if tracks.is_null() {
208                Vec::new()
209            } else {
210                let tracks = CFArray::wrap_under_get_rule(tracks);
211                tracks
212                    .into_iter()
213                    .map(|native_track| {
214                        let native_track = *native_track;
215                        let id =
216                            CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
217                                .to_string();
218                        Arc::new(RemoteVideoTrack::new(
219                            native_track,
220                            id,
221                            participant_id.into(),
222                        ))
223                    })
224                    .collect()
225            }
226        }
227    }
228
229    pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
230        let (tx, rx) = mpsc::unbounded();
231        self.remote_video_track_subscribers.lock().push(tx);
232        rx
233    }
234
235    fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
236        let track = Arc::new(track);
237        self.remote_video_track_subscribers.lock().retain(|tx| {
238            tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
239                .is_ok()
240        });
241    }
242
243    fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
244        self.remote_video_track_subscribers.lock().retain(|tx| {
245            tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
246                publisher_id: publisher_id.clone(),
247                track_id: track_id.clone(),
248            })
249            .is_ok()
250        });
251    }
252
253    fn build_done_callback() -> (
254        extern "C" fn(*mut c_void, CFStringRef),
255        *mut c_void,
256        oneshot::Receiver<Result<()>>,
257    ) {
258        let (tx, rx) = oneshot::channel();
259        extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
260            let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
261            if error.is_null() {
262                let _ = tx.send(Ok(()));
263            } else {
264                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
265                let _ = tx.send(Err(anyhow!(error)));
266            }
267        }
268        (
269            done_callback,
270            Box::into_raw(Box::new(tx)) as *mut c_void,
271            rx,
272        )
273    }
274}
275
276impl Drop for Room {
277    fn drop(&mut self) {
278        unsafe {
279            LKRoomDisconnect(self.native_room);
280            CFRelease(self.native_room);
281        }
282    }
283}
284
285struct RoomDelegate {
286    native_delegate: *const c_void,
287    weak_room: *const Room,
288}
289
290impl RoomDelegate {
291    fn new(weak_room: Weak<Room>) -> Self {
292        let weak_room = Weak::into_raw(weak_room);
293        let native_delegate = unsafe {
294            LKRoomDelegateCreate(
295                weak_room as *mut c_void,
296                Self::on_did_disconnect,
297                Self::on_did_subscribe_to_remote_video_track,
298                Self::on_did_unsubscribe_from_remote_video_track,
299            )
300        };
301        Self {
302            native_delegate,
303            weak_room,
304        }
305    }
306
307    extern "C" fn on_did_disconnect(room: *mut c_void) {
308        let room = unsafe { Weak::from_raw(room as *mut Room) };
309        if let Some(room) = room.upgrade() {
310            room.did_disconnect();
311        }
312        let _ = Weak::into_raw(room);
313    }
314
315    extern "C" fn on_did_subscribe_to_remote_video_track(
316        room: *mut c_void,
317        publisher_id: CFStringRef,
318        track_id: CFStringRef,
319        track: *const c_void,
320    ) {
321        let room = unsafe { Weak::from_raw(room as *mut Room) };
322        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
323        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
324        let track = RemoteVideoTrack::new(track, track_id, publisher_id);
325        if let Some(room) = room.upgrade() {
326            room.did_subscribe_to_remote_video_track(track);
327        }
328        let _ = Weak::into_raw(room);
329    }
330
331    extern "C" fn on_did_unsubscribe_from_remote_video_track(
332        room: *mut c_void,
333        publisher_id: CFStringRef,
334        track_id: CFStringRef,
335    ) {
336        let room = unsafe { Weak::from_raw(room as *mut Room) };
337        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
338        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
339        if let Some(room) = room.upgrade() {
340            room.did_unsubscribe_from_remote_video_track(publisher_id, track_id);
341        }
342        let _ = Weak::into_raw(room);
343    }
344}
345
346impl Drop for RoomDelegate {
347    fn drop(&mut self) {
348        unsafe {
349            CFRelease(self.native_delegate);
350            let _ = Weak::from_raw(self.weak_room);
351        }
352    }
353}
354
355pub struct LocalVideoTrack(*const c_void);
356
357impl LocalVideoTrack {
358    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
359        Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) })
360    }
361}
362
363impl Drop for LocalVideoTrack {
364    fn drop(&mut self) {
365        unsafe { CFRelease(self.0) }
366    }
367}
368
369pub struct LocalTrackPublication(*const c_void);
370
371impl Drop for LocalTrackPublication {
372    fn drop(&mut self) {
373        unsafe { CFRelease(self.0) }
374    }
375}
376
377#[derive(Debug)]
378pub struct RemoteVideoTrack {
379    native_track: *const c_void,
380    sid: Sid,
381    publisher_id: String,
382}
383
384impl RemoteVideoTrack {
385    fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
386        unsafe {
387            CFRetain(native_track);
388        }
389        Self {
390            native_track,
391            sid,
392            publisher_id,
393        }
394    }
395
396    pub fn sid(&self) -> &str {
397        &self.sid
398    }
399
400    pub fn publisher_id(&self) -> &str {
401        &self.publisher_id
402    }
403
404    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
405        extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool {
406            unsafe {
407                let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
408                let buffer = CVImageBuffer::wrap_under_get_rule(frame);
409                let result = tx.try_broadcast(Frame(buffer));
410                let _ = Box::into_raw(tx);
411                match result {
412                    Ok(_) => true,
413                    Err(async_broadcast::TrySendError::Closed(_))
414                    | Err(async_broadcast::TrySendError::Inactive(_)) => {
415                        log::warn!("no active receiver for frame");
416                        false
417                    }
418                    Err(async_broadcast::TrySendError::Full(_)) => {
419                        log::warn!("skipping frame as receiver is not keeping up");
420                        true
421                    }
422                }
423            }
424        }
425
426        extern "C" fn on_drop(callback_data: *mut c_void) {
427            unsafe {
428                let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
429            }
430        }
431
432        let (tx, rx) = async_broadcast::broadcast(64);
433        unsafe {
434            let renderer = LKVideoRendererCreate(
435                Box::into_raw(Box::new(tx)) as *mut c_void,
436                on_frame,
437                on_drop,
438            );
439            LKVideoTrackAddRenderer(self.native_track, renderer);
440            rx
441        }
442    }
443}
444
445impl Drop for RemoteVideoTrack {
446    fn drop(&mut self) {
447        unsafe { CFRelease(self.native_track) }
448    }
449}
450
451pub enum RemoteVideoTrackUpdate {
452    Subscribed(Arc<RemoteVideoTrack>),
453    Unsubscribed { publisher_id: Sid, track_id: Sid },
454}
455
456pub struct MacOSDisplay(*const c_void);
457
458impl MacOSDisplay {
459    fn new(ptr: *const c_void) -> Self {
460        unsafe {
461            CFRetain(ptr);
462        }
463        Self(ptr)
464    }
465}
466
467impl Drop for MacOSDisplay {
468    fn drop(&mut self) {
469        unsafe { CFRelease(self.0) }
470    }
471}
472
473#[derive(Clone)]
474pub struct Frame(CVImageBuffer);
475
476impl Frame {
477    pub fn width(&self) -> usize {
478        self.0.width()
479    }
480
481    pub fn height(&self) -> usize {
482        self.0.height()
483    }
484
485    pub fn image(&self) -> CVImageBuffer {
486        self.0.clone()
487    }
488}