1use anyhow::{anyhow, Context, Result};
2use core_foundation::{
3 array::{CFArray, CFArrayRef},
4 base::{CFRelease, CFRetain, TCFType},
5 string::{CFString, CFStringRef},
6};
7use futures::{
8 channel::{mpsc, oneshot},
9 Future,
10};
11pub use media::core_video::CVImageBuffer;
12use media::core_video::CVImageBufferRef;
13use parking_lot::Mutex;
14use std::{
15 ffi::c_void,
16 sync::{Arc, Weak},
17};
18
19extern "C" {
20 fn LKRoomDelegateCreate(
21 callback_data: *mut c_void,
22 on_did_subscribe_to_remote_video_track: extern "C" fn(
23 callback_data: *mut c_void,
24 publisher_id: CFStringRef,
25 track_id: CFStringRef,
26 remote_track: *const c_void,
27 ),
28 on_did_unsubscribe_from_remote_video_track: extern "C" fn(
29 callback_data: *mut c_void,
30 publisher_id: CFStringRef,
31 track_id: CFStringRef,
32 ),
33 ) -> *const c_void;
34
35 fn LKRoomCreate(delegate: *const c_void) -> *const c_void;
36 fn LKRoomConnect(
37 room: *const c_void,
38 url: CFStringRef,
39 token: CFStringRef,
40 callback: extern "C" fn(*mut c_void, CFStringRef),
41 callback_data: *mut c_void,
42 );
43 fn LKRoomDisconnect(room: *const c_void);
44 fn LKRoomPublishVideoTrack(
45 room: *const c_void,
46 track: *const c_void,
47 callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
48 callback_data: *mut c_void,
49 );
50 fn LKRoomUnpublishTrack(room: *const c_void, publication: *const c_void);
51 fn LKRoomVideoTracksForRemoteParticipant(
52 room: *const c_void,
53 participant_id: CFStringRef,
54 ) -> CFArrayRef;
55
56 fn LKVideoRendererCreate(
57 callback_data: *mut c_void,
58 on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool,
59 on_drop: extern "C" fn(callback_data: *mut c_void),
60 ) -> *const c_void;
61
62 fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
63 fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef;
64
65 fn LKDisplaySources(
66 callback_data: *mut c_void,
67 callback: extern "C" fn(
68 callback_data: *mut c_void,
69 sources: CFArrayRef,
70 error: CFStringRef,
71 ),
72 );
73 fn LKCreateScreenShareTrackForDisplay(display: *const c_void) -> *const c_void;
74}
75
76pub type Sid = String;
77
78pub struct Room {
79 native_room: *const c_void,
80 remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
81 _delegate: RoomDelegate,
82}
83
84impl Room {
85 pub fn new() -> Arc<Self> {
86 Arc::new_cyclic(|weak_room| {
87 let delegate = RoomDelegate::new(weak_room.clone());
88 Self {
89 native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
90 remote_video_track_subscribers: Default::default(),
91 _delegate: delegate,
92 }
93 })
94 }
95
96 pub fn connect(&self, url: &str, token: &str) -> impl Future<Output = Result<()>> {
97 let url = CFString::new(url);
98 let token = CFString::new(token);
99 let (did_connect, tx, rx) = Self::build_done_callback();
100 unsafe {
101 LKRoomConnect(
102 self.native_room,
103 url.as_concrete_TypeRef(),
104 token.as_concrete_TypeRef(),
105 did_connect,
106 tx,
107 )
108 }
109
110 async { rx.await.unwrap().context("error connecting to room") }
111 }
112
113 pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
114 extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
115 unsafe {
116 let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
117
118 if sources.is_null() {
119 let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
120 } else {
121 let sources = CFArray::wrap_under_get_rule(sources)
122 .into_iter()
123 .map(|source| MacOSDisplay::new(*source))
124 .collect();
125
126 let _ = tx.send(Ok(sources));
127 }
128 }
129 }
130
131 let (tx, rx) = oneshot::channel();
132
133 unsafe {
134 LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
135 }
136
137 async move { rx.await.unwrap() }
138 }
139
140 pub fn publish_video_track(
141 self: &Arc<Self>,
142 track: &LocalVideoTrack,
143 ) -> impl Future<Output = Result<LocalTrackPublication>> {
144 let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
145 extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
146 let tx =
147 unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
148 if error.is_null() {
149 let _ = tx.send(Ok(LocalTrackPublication(publication)));
150 } else {
151 let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
152 let _ = tx.send(Err(anyhow!(error)));
153 }
154 }
155 unsafe {
156 LKRoomPublishVideoTrack(
157 self.native_room,
158 track.0,
159 callback,
160 Box::into_raw(Box::new(tx)) as *mut c_void,
161 );
162 }
163 async { rx.await.unwrap().context("error publishing video track") }
164 }
165
166 pub fn unpublish_track(&self, publication: LocalTrackPublication) {
167 unsafe {
168 LKRoomUnpublishTrack(self.native_room, publication.0);
169 }
170 }
171
172 pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
173 unsafe {
174 let tracks = LKRoomVideoTracksForRemoteParticipant(
175 self.native_room,
176 CFString::new(participant_id).as_concrete_TypeRef(),
177 );
178
179 if tracks.is_null() {
180 Vec::new()
181 } else {
182 let tracks = CFArray::wrap_under_get_rule(tracks);
183 tracks
184 .into_iter()
185 .map(|native_track| {
186 let native_track = *native_track;
187 let id =
188 CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
189 .to_string();
190 Arc::new(RemoteVideoTrack::new(
191 native_track,
192 id,
193 participant_id.into(),
194 ))
195 })
196 .collect()
197 }
198 }
199 }
200
201 pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
202 let (tx, rx) = mpsc::unbounded();
203 self.remote_video_track_subscribers.lock().push(tx);
204 rx
205 }
206
207 fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
208 let track = Arc::new(track);
209 self.remote_video_track_subscribers.lock().retain(|tx| {
210 tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
211 .is_ok()
212 });
213 }
214
215 fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
216 self.remote_video_track_subscribers.lock().retain(|tx| {
217 tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
218 publisher_id: publisher_id.clone(),
219 track_id: track_id.clone(),
220 })
221 .is_ok()
222 });
223 }
224
225 fn build_done_callback() -> (
226 extern "C" fn(*mut c_void, CFStringRef),
227 *mut c_void,
228 oneshot::Receiver<Result<()>>,
229 ) {
230 let (tx, rx) = oneshot::channel();
231 extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
232 let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
233 if error.is_null() {
234 let _ = tx.send(Ok(()));
235 } else {
236 let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
237 let _ = tx.send(Err(anyhow!(error)));
238 }
239 }
240 (
241 done_callback,
242 Box::into_raw(Box::new(tx)) as *mut c_void,
243 rx,
244 )
245 }
246}
247
248impl Drop for Room {
249 fn drop(&mut self) {
250 unsafe {
251 LKRoomDisconnect(self.native_room);
252 CFRelease(self.native_room);
253 }
254 }
255}
256
257struct RoomDelegate {
258 native_delegate: *const c_void,
259 weak_room: *const Room,
260}
261
262impl RoomDelegate {
263 fn new(weak_room: Weak<Room>) -> Self {
264 let weak_room = Weak::into_raw(weak_room);
265 let native_delegate = unsafe {
266 LKRoomDelegateCreate(
267 weak_room as *mut c_void,
268 Self::on_did_subscribe_to_remote_video_track,
269 Self::on_did_unsubscribe_from_remote_video_track,
270 )
271 };
272 Self {
273 native_delegate,
274 weak_room,
275 }
276 }
277
278 extern "C" fn on_did_subscribe_to_remote_video_track(
279 room: *mut c_void,
280 publisher_id: CFStringRef,
281 track_id: CFStringRef,
282 track: *const c_void,
283 ) {
284 let room = unsafe { Weak::from_raw(room as *mut Room) };
285 let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
286 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
287 let track = RemoteVideoTrack::new(track, track_id, publisher_id);
288 if let Some(room) = room.upgrade() {
289 room.did_subscribe_to_remote_video_track(track);
290 }
291 let _ = Weak::into_raw(room);
292 }
293
294 extern "C" fn on_did_unsubscribe_from_remote_video_track(
295 room: *mut c_void,
296 publisher_id: CFStringRef,
297 track_id: CFStringRef,
298 ) {
299 let room = unsafe { Weak::from_raw(room as *mut Room) };
300 let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
301 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
302 if let Some(room) = room.upgrade() {
303 room.did_unsubscribe_from_remote_video_track(publisher_id, track_id);
304 }
305 let _ = Weak::into_raw(room);
306 }
307}
308
309impl Drop for RoomDelegate {
310 fn drop(&mut self) {
311 unsafe {
312 CFRelease(self.native_delegate);
313 let _ = Weak::from_raw(self.weak_room);
314 }
315 }
316}
317
318pub struct LocalVideoTrack(*const c_void);
319
320impl LocalVideoTrack {
321 pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
322 Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) })
323 }
324}
325
326impl Drop for LocalVideoTrack {
327 fn drop(&mut self) {
328 unsafe { CFRelease(self.0) }
329 }
330}
331
332pub struct LocalTrackPublication(*const c_void);
333
334impl Drop for LocalTrackPublication {
335 fn drop(&mut self) {
336 unsafe { CFRelease(self.0) }
337 }
338}
339
340#[derive(Debug)]
341pub struct RemoteVideoTrack {
342 native_track: *const c_void,
343 sid: Sid,
344 publisher_id: String,
345}
346
347impl RemoteVideoTrack {
348 fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
349 unsafe {
350 CFRetain(native_track);
351 }
352 Self {
353 native_track,
354 sid,
355 publisher_id,
356 }
357 }
358
359 pub fn sid(&self) -> &str {
360 &self.sid
361 }
362
363 pub fn publisher_id(&self) -> &str {
364 &self.publisher_id
365 }
366
367 pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
368 extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool {
369 unsafe {
370 let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
371 let buffer = CVImageBuffer::wrap_under_get_rule(frame);
372 let result = tx.try_broadcast(Frame(buffer));
373 let _ = Box::into_raw(tx);
374 match result {
375 Ok(_) => true,
376 Err(async_broadcast::TrySendError::Closed(_))
377 | Err(async_broadcast::TrySendError::Inactive(_)) => {
378 log::warn!("no active receiver for frame");
379 false
380 }
381 Err(async_broadcast::TrySendError::Full(_)) => {
382 log::warn!("skipping frame as receiver is not keeping up");
383 true
384 }
385 }
386 }
387 }
388
389 extern "C" fn on_drop(callback_data: *mut c_void) {
390 unsafe {
391 let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
392 }
393 }
394
395 let (tx, rx) = async_broadcast::broadcast(64);
396 unsafe {
397 let renderer = LKVideoRendererCreate(
398 Box::into_raw(Box::new(tx)) as *mut c_void,
399 on_frame,
400 on_drop,
401 );
402 LKVideoTrackAddRenderer(self.native_track, renderer);
403 rx
404 }
405 }
406}
407
408impl Drop for RemoteVideoTrack {
409 fn drop(&mut self) {
410 unsafe { CFRelease(self.native_track) }
411 }
412}
413
414pub enum RemoteVideoTrackUpdate {
415 Subscribed(Arc<RemoteVideoTrack>),
416 Unsubscribed { publisher_id: Sid, track_id: Sid },
417}
418
419pub struct MacOSDisplay(*const c_void);
420
421impl MacOSDisplay {
422 fn new(ptr: *const c_void) -> Self {
423 unsafe {
424 CFRetain(ptr);
425 }
426 Self(ptr)
427 }
428}
429
430impl Drop for MacOSDisplay {
431 fn drop(&mut self) {
432 unsafe { CFRelease(self.0) }
433 }
434}
435
436#[derive(Clone)]
437pub struct Frame(CVImageBuffer);
438
439impl Frame {
440 pub fn width(&self) -> usize {
441 self.0.width()
442 }
443
444 pub fn height(&self) -> usize {
445 self.0.height()
446 }
447
448 pub fn image(&self) -> CVImageBuffer {
449 self.0.clone()
450 }
451}