1use anyhow::{anyhow, Context, Result};
2use core_foundation::{
3 array::{CFArray, CFArrayRef},
4 base::{CFRetain, TCFType},
5 string::{CFString, CFStringRef},
6};
7use futures::{
8 channel::{mpsc, oneshot},
9 Future,
10};
11use media::core_video::{CVImageBuffer, CVImageBufferRef};
12use parking_lot::Mutex;
13use std::{
14 ffi::c_void,
15 sync::{Arc, Weak},
16};
17
18pub type Sid = String;
19
20extern "C" {
21 fn LKRelease(object: *const c_void);
22
23 fn LKRoomDelegateCreate(
24 callback_data: *mut c_void,
25 on_did_subscribe_to_remote_video_track: extern "C" fn(
26 callback_data: *mut c_void,
27 publisher_id: CFStringRef,
28 track_id: CFStringRef,
29 remote_track: *const c_void,
30 ),
31 on_did_unsubscribe_from_remote_video_track: extern "C" fn(
32 callback_data: *mut c_void,
33 publisher_id: CFStringRef,
34 track_id: CFStringRef,
35 ),
36 ) -> *const c_void;
37
38 fn LKRoomCreate(delegate: *const c_void) -> *const c_void;
39 fn LKRoomConnect(
40 room: *const c_void,
41 url: CFStringRef,
42 token: CFStringRef,
43 callback: extern "C" fn(*mut c_void, CFStringRef),
44 callback_data: *mut c_void,
45 );
46 fn LKRoomPublishVideoTrack(
47 room: *const c_void,
48 track: *const c_void,
49 callback: extern "C" fn(*mut c_void, CFStringRef),
50 callback_data: *mut c_void,
51 );
52 fn LKRoomVideoTracksForRemoteParticipant(
53 room: *const c_void,
54 participant_id: CFStringRef,
55 ) -> CFArrayRef;
56
57 fn LKVideoRendererCreate(
58 callback_data: *mut c_void,
59 on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef),
60 on_drop: extern "C" fn(callback_data: *mut c_void),
61 ) -> *const c_void;
62
63 fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
64 fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef;
65
66 fn LKDisplaySources(
67 callback_data: *mut c_void,
68 callback: extern "C" fn(
69 callback_data: *mut c_void,
70 sources: CFArrayRef,
71 error: CFStringRef,
72 ),
73 );
74 fn LKCreateScreenShareTrackForDisplay(display: *const c_void) -> *const c_void;
75}
76
77pub struct Room {
78 native_room: *const c_void,
79 remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
80 _delegate: RoomDelegate,
81}
82
83impl Room {
84 pub fn new() -> Arc<Self> {
85 Arc::new_cyclic(|weak_room| {
86 let delegate = RoomDelegate::new(weak_room.clone());
87 Self {
88 native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
89 remote_video_track_subscribers: Default::default(),
90 _delegate: delegate,
91 }
92 })
93 }
94
95 pub fn connect(&self, url: &str, token: &str) -> impl Future<Output = Result<()>> {
96 let url = CFString::new(url);
97 let token = CFString::new(token);
98 let (did_connect, tx, rx) = Self::build_done_callback();
99 unsafe {
100 LKRoomConnect(
101 self.native_room,
102 url.as_concrete_TypeRef(),
103 token.as_concrete_TypeRef(),
104 did_connect,
105 tx,
106 )
107 }
108
109 async { rx.await.unwrap().context("error connecting to room") }
110 }
111
112 pub fn publish_video_track(&self, track: &LocalVideoTrack) -> impl Future<Output = Result<()>> {
113 let (did_publish, tx, rx) = Self::build_done_callback();
114 unsafe {
115 LKRoomPublishVideoTrack(self.native_room, track.0, did_publish, tx);
116 }
117 async { rx.await.unwrap().context("error publishing video track") }
118 }
119
120 pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
121 unsafe {
122 let tracks = LKRoomVideoTracksForRemoteParticipant(
123 self.native_room,
124 CFString::new(participant_id).as_concrete_TypeRef(),
125 );
126
127 if tracks.is_null() {
128 Vec::new()
129 } else {
130 let tracks = CFArray::wrap_under_get_rule(tracks);
131 tracks
132 .into_iter()
133 .map(|native_track| {
134 let native_track = *native_track;
135 let id =
136 CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
137 .to_string();
138 Arc::new(RemoteVideoTrack::new(
139 native_track,
140 id,
141 participant_id.into(),
142 ))
143 })
144 .collect()
145 }
146 }
147 }
148
149 pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
150 let (tx, rx) = mpsc::unbounded();
151 self.remote_video_track_subscribers.lock().push(tx);
152 rx
153 }
154
155 fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
156 let track = Arc::new(track);
157 self.remote_video_track_subscribers.lock().retain(|tx| {
158 tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
159 .is_ok()
160 });
161 }
162
163 fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
164 self.remote_video_track_subscribers.lock().retain(|tx| {
165 tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
166 publisher_id: publisher_id.clone(),
167 track_id: track_id.clone(),
168 })
169 .is_ok()
170 });
171 }
172
173 fn build_done_callback() -> (
174 extern "C" fn(*mut c_void, CFStringRef),
175 *mut c_void,
176 oneshot::Receiver<Result<()>>,
177 ) {
178 let (tx, rx) = oneshot::channel();
179 extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
180 let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
181 if error.is_null() {
182 let _ = tx.send(Ok(()));
183 } else {
184 let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
185 let _ = tx.send(Err(anyhow!(error)));
186 }
187 }
188 (
189 done_callback,
190 Box::into_raw(Box::new(tx)) as *mut c_void,
191 rx,
192 )
193 }
194}
195
196impl Drop for Room {
197 fn drop(&mut self) {
198 unsafe { LKRelease(self.native_room) }
199 }
200}
201
202struct RoomDelegate {
203 native_delegate: *const c_void,
204 weak_room: *const Room,
205}
206
207impl RoomDelegate {
208 fn new(weak_room: Weak<Room>) -> Self {
209 let weak_room = Weak::into_raw(weak_room);
210 let native_delegate = unsafe {
211 LKRoomDelegateCreate(
212 weak_room as *mut c_void,
213 Self::on_did_subscribe_to_remote_video_track,
214 Self::on_did_unsubscribe_from_remote_video_track,
215 )
216 };
217 Self {
218 native_delegate,
219 weak_room,
220 }
221 }
222
223 extern "C" fn on_did_subscribe_to_remote_video_track(
224 room: *mut c_void,
225 publisher_id: CFStringRef,
226 track_id: CFStringRef,
227 track: *const c_void,
228 ) {
229 let room = unsafe { Weak::from_raw(room as *mut Room) };
230 let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
231 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
232 let track = RemoteVideoTrack::new(track, track_id, publisher_id);
233 if let Some(room) = room.upgrade() {
234 room.did_subscribe_to_remote_video_track(track);
235 }
236 }
237
238 extern "C" fn on_did_unsubscribe_from_remote_video_track(
239 room: *mut c_void,
240 publisher_id: CFStringRef,
241 track_id: CFStringRef,
242 ) {
243 let room = unsafe { Weak::from_raw(room as *mut Room) };
244 let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
245 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
246 if let Some(room) = room.upgrade() {
247 room.did_unsubscribe_from_remote_video_track(publisher_id, track_id);
248 }
249 let _ = Weak::into_raw(room);
250 }
251}
252
253impl Drop for RoomDelegate {
254 fn drop(&mut self) {
255 unsafe {
256 LKRelease(self.native_delegate);
257 let _ = Weak::from_raw(self.weak_room);
258 }
259 }
260}
261
262pub struct LocalVideoTrack(*const c_void);
263
264impl LocalVideoTrack {
265 pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
266 Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) })
267 }
268}
269
270impl Drop for LocalVideoTrack {
271 fn drop(&mut self) {
272 unsafe { LKRelease(self.0) }
273 }
274}
275
276#[derive(Debug)]
277pub struct RemoteVideoTrack {
278 native_track: *const c_void,
279 sid: Sid,
280 publisher_id: String,
281}
282
283impl RemoteVideoTrack {
284 fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
285 unsafe {
286 CFRetain(native_track);
287 }
288 Self {
289 native_track,
290 sid,
291 publisher_id,
292 }
293 }
294
295 pub fn sid(&self) -> &str {
296 &self.sid
297 }
298
299 pub fn publisher_id(&self) -> &str {
300 &self.publisher_id
301 }
302
303 pub fn add_renderer<F>(&self, callback: F)
304 where
305 F: 'static + FnMut(CVImageBuffer),
306 {
307 extern "C" fn on_frame<F>(callback_data: *mut c_void, frame: CVImageBufferRef)
308 where
309 F: FnMut(CVImageBuffer),
310 {
311 unsafe {
312 let buffer = CVImageBuffer::wrap_under_get_rule(frame);
313 let callback = &mut *(callback_data as *mut F);
314 callback(buffer);
315 }
316 }
317
318 extern "C" fn on_drop<F>(callback_data: *mut c_void) {
319 unsafe {
320 let _ = Box::from_raw(callback_data as *mut F);
321 }
322 }
323
324 let callback_data = Box::into_raw(Box::new(callback));
325 unsafe {
326 let renderer =
327 LKVideoRendererCreate(callback_data as *mut c_void, on_frame::<F>, on_drop::<F>);
328 LKVideoTrackAddRenderer(self.native_track, renderer);
329 }
330 }
331}
332
333impl Drop for RemoteVideoTrack {
334 fn drop(&mut self) {
335 unsafe { LKRelease(self.native_track) }
336 }
337}
338
339pub enum RemoteVideoTrackUpdate {
340 Subscribed(Arc<RemoteVideoTrack>),
341 Unsubscribed { publisher_id: Sid, track_id: Sid },
342}
343
344pub struct MacOSDisplay(*const c_void);
345
346impl MacOSDisplay {
347 fn new(ptr: *const c_void) -> Self {
348 unsafe {
349 CFRetain(ptr);
350 }
351 Self(ptr)
352 }
353}
354
355impl Drop for MacOSDisplay {
356 fn drop(&mut self) {
357 unsafe { LKRelease(self.0) }
358 }
359}
360
361pub fn display_sources() -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
362 extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
363 unsafe {
364 let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
365
366 if sources.is_null() {
367 let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
368 } else {
369 let sources = CFArray::wrap_under_get_rule(sources)
370 .into_iter()
371 .map(|source| MacOSDisplay::new(*source))
372 .collect();
373
374 let _ = tx.send(Ok(sources));
375 }
376 }
377 }
378
379 let (tx, rx) = oneshot::channel();
380
381 unsafe {
382 LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
383 }
384
385 async move { rx.await.unwrap() }
386}
387
388#[cfg(test)]
389mod tests {
390 #[test]
391 fn test_client() {}
392}