live_kit_client.rs

  1use anyhow::{anyhow, Context, Result};
  2use core_foundation::{
  3    array::{CFArray, CFArrayRef},
  4    base::{CFRelease, CFRetain, TCFType},
  5    string::{CFString, CFStringRef},
  6};
  7use futures::{
  8    channel::{mpsc, oneshot},
  9    Future,
 10};
 11use media::core_video::{CVImageBuffer, CVImageBufferRef};
 12use parking_lot::Mutex;
 13use std::{
 14    ffi::c_void,
 15    sync::{Arc, Weak},
 16};
 17
 18pub type Sid = String;
 19
 20extern "C" {
 21    fn LKRoomDelegateCreate(
 22        callback_data: *mut c_void,
 23        on_did_subscribe_to_remote_video_track: extern "C" fn(
 24            callback_data: *mut c_void,
 25            publisher_id: CFStringRef,
 26            track_id: CFStringRef,
 27            remote_track: *const c_void,
 28        ),
 29        on_did_unsubscribe_from_remote_video_track: extern "C" fn(
 30            callback_data: *mut c_void,
 31            publisher_id: CFStringRef,
 32            track_id: CFStringRef,
 33        ),
 34    ) -> *const c_void;
 35
 36    fn LKRoomCreate(delegate: *const c_void) -> *const c_void;
 37    fn LKRoomConnect(
 38        room: *const c_void,
 39        url: CFStringRef,
 40        token: CFStringRef,
 41        callback: extern "C" fn(*mut c_void, CFStringRef),
 42        callback_data: *mut c_void,
 43    );
 44    fn LKRoomDisconnect(room: *const c_void);
 45    fn LKRoomPublishVideoTrack(
 46        room: *const c_void,
 47        track: *const c_void,
 48        callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
 49        callback_data: *mut c_void,
 50    );
 51    fn LKRoomUnpublishTrack(room: *const c_void, publication: *const c_void);
 52    fn LKRoomVideoTracksForRemoteParticipant(
 53        room: *const c_void,
 54        participant_id: CFStringRef,
 55    ) -> CFArrayRef;
 56
 57    fn LKVideoRendererCreate(
 58        callback_data: *mut c_void,
 59        on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef),
 60        on_drop: extern "C" fn(callback_data: *mut c_void),
 61    ) -> *const c_void;
 62
 63    fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
 64    fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef;
 65
 66    fn LKDisplaySources(
 67        callback_data: *mut c_void,
 68        callback: extern "C" fn(
 69            callback_data: *mut c_void,
 70            sources: CFArrayRef,
 71            error: CFStringRef,
 72        ),
 73    );
 74    fn LKCreateScreenShareTrackForDisplay(display: *const c_void) -> *const c_void;
 75}
 76
 77pub struct Room {
 78    native_room: *const c_void,
 79    remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
 80    _delegate: RoomDelegate,
 81}
 82
 83impl Room {
 84    pub fn new() -> Arc<Self> {
 85        Arc::new_cyclic(|weak_room| {
 86            let delegate = RoomDelegate::new(weak_room.clone());
 87            Self {
 88                native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
 89                remote_video_track_subscribers: Default::default(),
 90                _delegate: delegate,
 91            }
 92        })
 93    }
 94
 95    pub fn connect(&self, url: &str, token: &str) -> impl Future<Output = Result<()>> {
 96        let url = CFString::new(url);
 97        let token = CFString::new(token);
 98        let (did_connect, tx, rx) = Self::build_done_callback();
 99        unsafe {
100            LKRoomConnect(
101                self.native_room,
102                url.as_concrete_TypeRef(),
103                token.as_concrete_TypeRef(),
104                did_connect,
105                tx,
106            )
107        }
108
109        async { rx.await.unwrap().context("error connecting to room") }
110    }
111
112    pub fn publish_video_track(
113        &self,
114        track: &LocalVideoTrack,
115    ) -> impl Future<Output = Result<LocalTrackPublication>> {
116        let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
117        extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
118            let tx =
119                unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
120            if error.is_null() {
121                let _ = tx.send(Ok(LocalTrackPublication(publication)));
122            } else {
123                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
124                let _ = tx.send(Err(anyhow!(error)));
125            }
126        }
127        unsafe {
128            LKRoomPublishVideoTrack(
129                self.native_room,
130                track.0,
131                callback,
132                Box::into_raw(Box::new(tx)) as *mut c_void,
133            );
134        }
135        async { rx.await.unwrap().context("error publishing video track") }
136    }
137
138    pub fn unpublish_track(&self, publication: LocalTrackPublication) {
139        unsafe {
140            LKRoomUnpublishTrack(self.native_room, publication.0);
141        }
142    }
143
144    pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
145        unsafe {
146            let tracks = LKRoomVideoTracksForRemoteParticipant(
147                self.native_room,
148                CFString::new(participant_id).as_concrete_TypeRef(),
149            );
150
151            if tracks.is_null() {
152                Vec::new()
153            } else {
154                let tracks = CFArray::wrap_under_get_rule(tracks);
155                tracks
156                    .into_iter()
157                    .map(|native_track| {
158                        let native_track = *native_track;
159                        let id =
160                            CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
161                                .to_string();
162                        Arc::new(RemoteVideoTrack::new(
163                            native_track,
164                            id,
165                            participant_id.into(),
166                        ))
167                    })
168                    .collect()
169            }
170        }
171    }
172
173    pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
174        let (tx, rx) = mpsc::unbounded();
175        self.remote_video_track_subscribers.lock().push(tx);
176        rx
177    }
178
179    fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
180        let track = Arc::new(track);
181        self.remote_video_track_subscribers.lock().retain(|tx| {
182            tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
183                .is_ok()
184        });
185    }
186
187    fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
188        self.remote_video_track_subscribers.lock().retain(|tx| {
189            tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
190                publisher_id: publisher_id.clone(),
191                track_id: track_id.clone(),
192            })
193            .is_ok()
194        });
195    }
196
197    fn build_done_callback() -> (
198        extern "C" fn(*mut c_void, CFStringRef),
199        *mut c_void,
200        oneshot::Receiver<Result<()>>,
201    ) {
202        let (tx, rx) = oneshot::channel();
203        extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
204            let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
205            if error.is_null() {
206                let _ = tx.send(Ok(()));
207            } else {
208                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
209                let _ = tx.send(Err(anyhow!(error)));
210            }
211        }
212        (
213            done_callback,
214            Box::into_raw(Box::new(tx)) as *mut c_void,
215            rx,
216        )
217    }
218}
219
220impl Drop for Room {
221    fn drop(&mut self) {
222        unsafe {
223            LKRoomDisconnect(self.native_room);
224            CFRelease(self.native_room);
225        }
226    }
227}
228
229struct RoomDelegate {
230    native_delegate: *const c_void,
231    weak_room: *const Room,
232}
233
234impl RoomDelegate {
235    fn new(weak_room: Weak<Room>) -> Self {
236        let weak_room = Weak::into_raw(weak_room);
237        let native_delegate = unsafe {
238            LKRoomDelegateCreate(
239                weak_room as *mut c_void,
240                Self::on_did_subscribe_to_remote_video_track,
241                Self::on_did_unsubscribe_from_remote_video_track,
242            )
243        };
244        Self {
245            native_delegate,
246            weak_room,
247        }
248    }
249
250    extern "C" fn on_did_subscribe_to_remote_video_track(
251        room: *mut c_void,
252        publisher_id: CFStringRef,
253        track_id: CFStringRef,
254        track: *const c_void,
255    ) {
256        let room = unsafe { Weak::from_raw(room as *mut Room) };
257        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
258        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
259        let track = RemoteVideoTrack::new(track, track_id, publisher_id);
260        if let Some(room) = room.upgrade() {
261            room.did_subscribe_to_remote_video_track(track);
262        }
263        let _ = Weak::into_raw(room);
264    }
265
266    extern "C" fn on_did_unsubscribe_from_remote_video_track(
267        room: *mut c_void,
268        publisher_id: CFStringRef,
269        track_id: CFStringRef,
270    ) {
271        let room = unsafe { Weak::from_raw(room as *mut Room) };
272        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
273        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
274        if let Some(room) = room.upgrade() {
275            room.did_unsubscribe_from_remote_video_track(publisher_id, track_id);
276        }
277        let _ = Weak::into_raw(room);
278    }
279}
280
281impl Drop for RoomDelegate {
282    fn drop(&mut self) {
283        unsafe {
284            CFRelease(self.native_delegate);
285            let _ = Weak::from_raw(self.weak_room);
286        }
287    }
288}
289
290pub struct LocalVideoTrack(*const c_void);
291
292impl LocalVideoTrack {
293    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
294        Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) })
295    }
296}
297
298impl Drop for LocalVideoTrack {
299    fn drop(&mut self) {
300        unsafe { CFRelease(self.0) }
301    }
302}
303
304pub struct LocalTrackPublication(*const c_void);
305
306impl Drop for LocalTrackPublication {
307    fn drop(&mut self) {
308        unsafe { CFRelease(self.0) }
309    }
310}
311
312#[derive(Debug)]
313pub struct RemoteVideoTrack {
314    native_track: *const c_void,
315    sid: Sid,
316    publisher_id: String,
317}
318
319impl RemoteVideoTrack {
320    fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
321        unsafe {
322            CFRetain(native_track);
323        }
324        Self {
325            native_track,
326            sid,
327            publisher_id,
328        }
329    }
330
331    pub fn sid(&self) -> &str {
332        &self.sid
333    }
334
335    pub fn publisher_id(&self) -> &str {
336        &self.publisher_id
337    }
338
339    pub fn add_renderer<F>(&self, callback: F)
340    where
341        F: 'static + FnMut(CVImageBuffer),
342    {
343        extern "C" fn on_frame<F>(callback_data: *mut c_void, frame: CVImageBufferRef)
344        where
345            F: FnMut(CVImageBuffer),
346        {
347            unsafe {
348                let buffer = CVImageBuffer::wrap_under_get_rule(frame);
349                let callback = &mut *(callback_data as *mut F);
350                callback(buffer);
351            }
352        }
353
354        extern "C" fn on_drop<F>(callback_data: *mut c_void) {
355            unsafe {
356                let _ = Box::from_raw(callback_data as *mut F);
357            }
358        }
359
360        let callback_data = Box::into_raw(Box::new(callback));
361        unsafe {
362            let renderer =
363                LKVideoRendererCreate(callback_data as *mut c_void, on_frame::<F>, on_drop::<F>);
364            LKVideoTrackAddRenderer(self.native_track, renderer);
365        }
366    }
367}
368
369impl Drop for RemoteVideoTrack {
370    fn drop(&mut self) {
371        unsafe { CFRelease(self.native_track) }
372    }
373}
374
375pub enum RemoteVideoTrackUpdate {
376    Subscribed(Arc<RemoteVideoTrack>),
377    Unsubscribed { publisher_id: Sid, track_id: Sid },
378}
379
380pub struct MacOSDisplay(*const c_void);
381
382impl MacOSDisplay {
383    fn new(ptr: *const c_void) -> Self {
384        unsafe {
385            CFRetain(ptr);
386        }
387        Self(ptr)
388    }
389}
390
391impl Drop for MacOSDisplay {
392    fn drop(&mut self) {
393        unsafe { CFRelease(self.0) }
394    }
395}
396
397pub fn display_sources() -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
398    extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
399        unsafe {
400            let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
401
402            if sources.is_null() {
403                let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
404            } else {
405                let sources = CFArray::wrap_under_get_rule(sources)
406                    .into_iter()
407                    .map(|source| MacOSDisplay::new(*source))
408                    .collect();
409
410                let _ = tx.send(Ok(sources));
411            }
412        }
413    }
414
415    let (tx, rx) = oneshot::channel();
416
417    unsafe {
418        LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
419    }
420
421    async move { rx.await.unwrap() }
422}
423
424#[cfg(test)]
425mod tests {
426    #[test]
427    fn test_client() {}
428}