live_kit_client.rs

  1use anyhow::{anyhow, Context, Result};
  2use core_foundation::{
  3    array::{CFArray, CFArrayRef},
  4    base::{CFRetain, TCFType},
  5    string::{CFString, CFStringRef},
  6};
  7use futures::{
  8    channel::{mpsc, oneshot},
  9    Future,
 10};
 11use media::core_video::{CVImageBuffer, CVImageBufferRef};
 12use parking_lot::Mutex;
 13use std::{
 14    ffi::c_void,
 15    sync::{Arc, Weak},
 16};
 17
 18pub type Sid = String;
 19
 20extern "C" {
 21    fn LKRelease(object: *const c_void);
 22
 23    fn LKRoomDelegateCreate(
 24        callback_data: *mut c_void,
 25        on_did_subscribe_to_remote_video_track: extern "C" fn(
 26            callback_data: *mut c_void,
 27            publisher_id: CFStringRef,
 28            track_id: CFStringRef,
 29            remote_track: *const c_void,
 30        ),
 31        on_did_unsubscribe_from_remote_video_track: extern "C" fn(
 32            callback_data: *mut c_void,
 33            publisher_id: CFStringRef,
 34            track_id: CFStringRef,
 35        ),
 36    ) -> *const c_void;
 37
 38    fn LKRoomCreate(delegate: *const c_void) -> *const c_void;
 39    fn LKRoomConnect(
 40        room: *const c_void,
 41        url: CFStringRef,
 42        token: CFStringRef,
 43        callback: extern "C" fn(*mut c_void, CFStringRef),
 44        callback_data: *mut c_void,
 45    );
 46    fn LKRoomPublishVideoTrack(
 47        room: *const c_void,
 48        track: *const c_void,
 49        callback: extern "C" fn(*mut c_void, CFStringRef),
 50        callback_data: *mut c_void,
 51    );
 52    fn LKRoomVideoTracksForRemoteParticipant(
 53        room: *const c_void,
 54        participant_id: CFStringRef,
 55    ) -> CFArrayRef;
 56
 57    fn LKVideoRendererCreate(
 58        callback_data: *mut c_void,
 59        on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef),
 60        on_drop: extern "C" fn(callback_data: *mut c_void),
 61    ) -> *const c_void;
 62
 63    fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
 64    fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef;
 65
 66    fn LKDisplaySources(
 67        callback_data: *mut c_void,
 68        callback: extern "C" fn(
 69            callback_data: *mut c_void,
 70            sources: CFArrayRef,
 71            error: CFStringRef,
 72        ),
 73    );
 74    fn LKCreateScreenShareTrackForDisplay(display: *const c_void) -> *const c_void;
 75}
 76
 77pub struct Room {
 78    native_room: *const c_void,
 79    remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
 80    _delegate: RoomDelegate,
 81}
 82
 83impl Room {
 84    pub fn new() -> Arc<Self> {
 85        Arc::new_cyclic(|weak_room| {
 86            let delegate = RoomDelegate::new(weak_room.clone());
 87            Self {
 88                native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
 89                remote_video_track_subscribers: Default::default(),
 90                _delegate: delegate,
 91            }
 92        })
 93    }
 94
 95    pub fn connect(&self, url: &str, token: &str) -> impl Future<Output = Result<()>> {
 96        let url = CFString::new(url);
 97        let token = CFString::new(token);
 98        let (did_connect, tx, rx) = Self::build_done_callback();
 99        unsafe {
100            LKRoomConnect(
101                self.native_room,
102                url.as_concrete_TypeRef(),
103                token.as_concrete_TypeRef(),
104                did_connect,
105                tx,
106            )
107        }
108
109        async { rx.await.unwrap().context("error connecting to room") }
110    }
111
112    pub fn publish_video_track(&self, track: &LocalVideoTrack) -> impl Future<Output = Result<()>> {
113        let (did_publish, tx, rx) = Self::build_done_callback();
114        unsafe {
115            LKRoomPublishVideoTrack(self.native_room, track.0, did_publish, tx);
116        }
117        async { rx.await.unwrap().context("error publishing video track") }
118    }
119
120    pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
121        unsafe {
122            let tracks = LKRoomVideoTracksForRemoteParticipant(
123                self.native_room,
124                CFString::new(participant_id).as_concrete_TypeRef(),
125            );
126
127            if tracks.is_null() {
128                Vec::new()
129            } else {
130                let tracks = CFArray::wrap_under_get_rule(tracks);
131                tracks
132                    .into_iter()
133                    .map(|native_track| {
134                        let native_track = *native_track;
135                        let id =
136                            CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
137                                .to_string();
138                        Arc::new(RemoteVideoTrack::new(
139                            native_track,
140                            id,
141                            participant_id.into(),
142                        ))
143                    })
144                    .collect()
145            }
146        }
147    }
148
149    pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
150        let (tx, rx) = mpsc::unbounded();
151        self.remote_video_track_subscribers.lock().push(tx);
152        rx
153    }
154
155    fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
156        let track = Arc::new(track);
157        self.remote_video_track_subscribers.lock().retain(|tx| {
158            tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
159                .is_ok()
160        });
161    }
162
163    fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
164        self.remote_video_track_subscribers.lock().retain(|tx| {
165            tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
166                publisher_id: publisher_id.clone(),
167                track_id: track_id.clone(),
168            })
169            .is_ok()
170        });
171    }
172
173    fn build_done_callback() -> (
174        extern "C" fn(*mut c_void, CFStringRef),
175        *mut c_void,
176        oneshot::Receiver<Result<()>>,
177    ) {
178        let (tx, rx) = oneshot::channel();
179        extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
180            let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
181            if error.is_null() {
182                let _ = tx.send(Ok(()));
183            } else {
184                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
185                let _ = tx.send(Err(anyhow!(error)));
186            }
187        }
188        (
189            done_callback,
190            Box::into_raw(Box::new(tx)) as *mut c_void,
191            rx,
192        )
193    }
194}
195
196impl Drop for Room {
197    fn drop(&mut self) {
198        unsafe { LKRelease(self.native_room) }
199    }
200}
201
202struct RoomDelegate {
203    native_delegate: *const c_void,
204    weak_room: *const Room,
205}
206
207impl RoomDelegate {
208    fn new(weak_room: Weak<Room>) -> Self {
209        let weak_room = Weak::into_raw(weak_room);
210        let native_delegate = unsafe {
211            LKRoomDelegateCreate(
212                weak_room as *mut c_void,
213                Self::on_did_subscribe_to_remote_video_track,
214                Self::on_did_unsubscribe_from_remote_video_track,
215            )
216        };
217        Self {
218            native_delegate,
219            weak_room,
220        }
221    }
222
223    extern "C" fn on_did_subscribe_to_remote_video_track(
224        room: *mut c_void,
225        publisher_id: CFStringRef,
226        track_id: CFStringRef,
227        track: *const c_void,
228    ) {
229        let room = unsafe { Weak::from_raw(room as *mut Room) };
230        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
231        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
232        let track = RemoteVideoTrack::new(track, track_id, publisher_id);
233        if let Some(room) = room.upgrade() {
234            room.did_subscribe_to_remote_video_track(track);
235        }
236    }
237
238    extern "C" fn on_did_unsubscribe_from_remote_video_track(
239        room: *mut c_void,
240        publisher_id: CFStringRef,
241        track_id: CFStringRef,
242    ) {
243        let room = unsafe { Weak::from_raw(room as *mut Room) };
244        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
245        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
246        if let Some(room) = room.upgrade() {
247            room.did_unsubscribe_from_remote_video_track(publisher_id, track_id);
248        }
249        let _ = Weak::into_raw(room);
250    }
251}
252
253impl Drop for RoomDelegate {
254    fn drop(&mut self) {
255        unsafe {
256            LKRelease(self.native_delegate);
257            let _ = Weak::from_raw(self.weak_room);
258        }
259    }
260}
261
262pub struct LocalVideoTrack(*const c_void);
263
264impl LocalVideoTrack {
265    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
266        Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) })
267    }
268}
269
270impl Drop for LocalVideoTrack {
271    fn drop(&mut self) {
272        unsafe { LKRelease(self.0) }
273    }
274}
275
276#[derive(Debug)]
277pub struct RemoteVideoTrack {
278    native_track: *const c_void,
279    sid: Sid,
280    publisher_id: String,
281}
282
283impl RemoteVideoTrack {
284    fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
285        unsafe {
286            CFRetain(native_track);
287        }
288        Self {
289            native_track,
290            sid,
291            publisher_id,
292        }
293    }
294
295    pub fn sid(&self) -> &str {
296        &self.sid
297    }
298
299    pub fn publisher_id(&self) -> &str {
300        &self.publisher_id
301    }
302
303    pub fn add_renderer<F>(&self, callback: F)
304    where
305        F: 'static + FnMut(CVImageBuffer),
306    {
307        extern "C" fn on_frame<F>(callback_data: *mut c_void, frame: CVImageBufferRef)
308        where
309            F: FnMut(CVImageBuffer),
310        {
311            unsafe {
312                let buffer = CVImageBuffer::wrap_under_get_rule(frame);
313                let callback = &mut *(callback_data as *mut F);
314                callback(buffer);
315            }
316        }
317
318        extern "C" fn on_drop<F>(callback_data: *mut c_void) {
319            unsafe {
320                let _ = Box::from_raw(callback_data as *mut F);
321            }
322        }
323
324        let callback_data = Box::into_raw(Box::new(callback));
325        unsafe {
326            let renderer =
327                LKVideoRendererCreate(callback_data as *mut c_void, on_frame::<F>, on_drop::<F>);
328            LKVideoTrackAddRenderer(self.native_track, renderer);
329        }
330    }
331}
332
333impl Drop for RemoteVideoTrack {
334    fn drop(&mut self) {
335        unsafe { LKRelease(self.native_track) }
336    }
337}
338
339pub enum RemoteVideoTrackUpdate {
340    Subscribed(Arc<RemoteVideoTrack>),
341    Unsubscribed { publisher_id: Sid, track_id: Sid },
342}
343
344pub struct MacOSDisplay(*const c_void);
345
346impl MacOSDisplay {
347    fn new(ptr: *const c_void) -> Self {
348        unsafe {
349            CFRetain(ptr);
350        }
351        Self(ptr)
352    }
353}
354
355impl Drop for MacOSDisplay {
356    fn drop(&mut self) {
357        unsafe { LKRelease(self.0) }
358    }
359}
360
361pub fn display_sources() -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
362    extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
363        unsafe {
364            let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
365
366            if sources.is_null() {
367                let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
368            } else {
369                let sources = CFArray::wrap_under_get_rule(sources)
370                    .into_iter()
371                    .map(|source| MacOSDisplay::new(*source))
372                    .collect();
373
374                let _ = tx.send(Ok(sources));
375            }
376        }
377    }
378
379    let (tx, rx) = oneshot::channel();
380
381    unsafe {
382        LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
383    }
384
385    async move { rx.await.unwrap() }
386}
387
388#[cfg(test)]
389mod tests {
390    #[test]
391    fn test_client() {}
392}