1use anyhow::{anyhow, Context, Result};
2use core_foundation::{
3 array::{CFArray, CFArrayRef},
4 base::{CFRelease, CFRetain, TCFType},
5 string::{CFString, CFStringRef},
6};
7use futures::{
8 channel::{mpsc, oneshot},
9 Future,
10};
11pub use media::core_video::CVImageBuffer;
12use media::core_video::CVImageBufferRef;
13use parking_lot::Mutex;
14use postage::watch;
15use std::{
16 ffi::c_void,
17 sync::{Arc, Weak},
18};
19
20extern "C" {
21 fn LKRoomDelegateCreate(
22 callback_data: *mut c_void,
23 on_did_disconnect: extern "C" fn(callback_data: *mut c_void),
24 on_did_subscribe_to_remote_video_track: extern "C" fn(
25 callback_data: *mut c_void,
26 publisher_id: CFStringRef,
27 track_id: CFStringRef,
28 remote_track: *const c_void,
29 ),
30 on_did_unsubscribe_from_remote_video_track: extern "C" fn(
31 callback_data: *mut c_void,
32 publisher_id: CFStringRef,
33 track_id: CFStringRef,
34 ),
35 ) -> *const c_void;
36
37 fn LKRoomCreate(delegate: *const c_void) -> *const c_void;
38 fn LKRoomConnect(
39 room: *const c_void,
40 url: CFStringRef,
41 token: CFStringRef,
42 callback: extern "C" fn(*mut c_void, CFStringRef),
43 callback_data: *mut c_void,
44 );
45 fn LKRoomDisconnect(room: *const c_void);
46 fn LKRoomPublishVideoTrack(
47 room: *const c_void,
48 track: *const c_void,
49 callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
50 callback_data: *mut c_void,
51 );
52 fn LKRoomUnpublishTrack(room: *const c_void, publication: *const c_void);
53 fn LKRoomVideoTracksForRemoteParticipant(
54 room: *const c_void,
55 participant_id: CFStringRef,
56 ) -> CFArrayRef;
57
58 fn LKVideoRendererCreate(
59 callback_data: *mut c_void,
60 on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool,
61 on_drop: extern "C" fn(callback_data: *mut c_void),
62 ) -> *const c_void;
63
64 fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
65 fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef;
66
67 fn LKDisplaySources(
68 callback_data: *mut c_void,
69 callback: extern "C" fn(
70 callback_data: *mut c_void,
71 sources: CFArrayRef,
72 error: CFStringRef,
73 ),
74 );
75 fn LKCreateScreenShareTrackForDisplay(display: *const c_void) -> *const c_void;
76}
77
78pub type Sid = String;
79
80#[derive(Clone, Eq, PartialEq)]
81pub enum ConnectionState {
82 Disconnected,
83 Connected { url: String, token: String },
84}
85
86pub struct Room {
87 native_room: *const c_void,
88 connection: Mutex<(
89 watch::Sender<ConnectionState>,
90 watch::Receiver<ConnectionState>,
91 )>,
92 remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
93 _delegate: RoomDelegate,
94}
95
96impl Room {
97 pub fn new() -> Arc<Self> {
98 Arc::new_cyclic(|weak_room| {
99 let delegate = RoomDelegate::new(weak_room.clone());
100 Self {
101 native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
102 connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)),
103 remote_video_track_subscribers: Default::default(),
104 _delegate: delegate,
105 }
106 })
107 }
108
109 pub fn status(&self) -> watch::Receiver<ConnectionState> {
110 self.connection.lock().1.clone()
111 }
112
113 pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
114 let url = CFString::new(url);
115 let token = CFString::new(token);
116 let (did_connect, tx, rx) = Self::build_done_callback();
117 unsafe {
118 LKRoomConnect(
119 self.native_room,
120 url.as_concrete_TypeRef(),
121 token.as_concrete_TypeRef(),
122 did_connect,
123 tx,
124 )
125 }
126
127 let this = self.clone();
128 let url = url.to_string();
129 let token = token.to_string();
130 async move {
131 match rx.await.unwrap().context("error connecting to room") {
132 Ok(()) => {
133 *this.connection.lock().0.borrow_mut() =
134 ConnectionState::Connected { url, token };
135 Ok(())
136 }
137 Err(err) => Err(err),
138 }
139 }
140 }
141
142 fn did_disconnect(&self) {
143 *self.connection.lock().0.borrow_mut() = ConnectionState::Disconnected;
144 }
145
146 pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
147 extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
148 unsafe {
149 let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
150
151 if sources.is_null() {
152 let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
153 } else {
154 let sources = CFArray::wrap_under_get_rule(sources)
155 .into_iter()
156 .map(|source| MacOSDisplay::new(*source))
157 .collect();
158
159 let _ = tx.send(Ok(sources));
160 }
161 }
162 }
163
164 let (tx, rx) = oneshot::channel();
165
166 unsafe {
167 LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
168 }
169
170 async move { rx.await.unwrap() }
171 }
172
173 pub fn publish_video_track(
174 self: &Arc<Self>,
175 track: &LocalVideoTrack,
176 ) -> impl Future<Output = Result<LocalTrackPublication>> {
177 let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
178 extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
179 let tx =
180 unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
181 if error.is_null() {
182 let _ = tx.send(Ok(LocalTrackPublication(publication)));
183 } else {
184 let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
185 let _ = tx.send(Err(anyhow!(error)));
186 }
187 }
188 unsafe {
189 LKRoomPublishVideoTrack(
190 self.native_room,
191 track.0,
192 callback,
193 Box::into_raw(Box::new(tx)) as *mut c_void,
194 );
195 }
196 async { rx.await.unwrap().context("error publishing video track") }
197 }
198
199 pub fn unpublish_track(&self, publication: LocalTrackPublication) {
200 unsafe {
201 LKRoomUnpublishTrack(self.native_room, publication.0);
202 }
203 }
204
205 pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
206 unsafe {
207 let tracks = LKRoomVideoTracksForRemoteParticipant(
208 self.native_room,
209 CFString::new(participant_id).as_concrete_TypeRef(),
210 );
211
212 if tracks.is_null() {
213 Vec::new()
214 } else {
215 let tracks = CFArray::wrap_under_get_rule(tracks);
216 tracks
217 .into_iter()
218 .map(|native_track| {
219 let native_track = *native_track;
220 let id =
221 CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
222 .to_string();
223 Arc::new(RemoteVideoTrack::new(
224 native_track,
225 id,
226 participant_id.into(),
227 ))
228 })
229 .collect()
230 }
231 }
232 }
233
234 pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
235 let (tx, rx) = mpsc::unbounded();
236 self.remote_video_track_subscribers.lock().push(tx);
237 rx
238 }
239
240 fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
241 let track = Arc::new(track);
242 self.remote_video_track_subscribers.lock().retain(|tx| {
243 tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
244 .is_ok()
245 });
246 }
247
248 fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
249 self.remote_video_track_subscribers.lock().retain(|tx| {
250 tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
251 publisher_id: publisher_id.clone(),
252 track_id: track_id.clone(),
253 })
254 .is_ok()
255 });
256 }
257
258 fn build_done_callback() -> (
259 extern "C" fn(*mut c_void, CFStringRef),
260 *mut c_void,
261 oneshot::Receiver<Result<()>>,
262 ) {
263 let (tx, rx) = oneshot::channel();
264 extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
265 let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
266 if error.is_null() {
267 let _ = tx.send(Ok(()));
268 } else {
269 let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
270 let _ = tx.send(Err(anyhow!(error)));
271 }
272 }
273 (
274 done_callback,
275 Box::into_raw(Box::new(tx)) as *mut c_void,
276 rx,
277 )
278 }
279}
280
281impl Drop for Room {
282 fn drop(&mut self) {
283 unsafe {
284 LKRoomDisconnect(self.native_room);
285 CFRelease(self.native_room);
286 }
287 }
288}
289
290struct RoomDelegate {
291 native_delegate: *const c_void,
292 weak_room: *const Room,
293}
294
295impl RoomDelegate {
296 fn new(weak_room: Weak<Room>) -> Self {
297 let weak_room = Weak::into_raw(weak_room);
298 let native_delegate = unsafe {
299 LKRoomDelegateCreate(
300 weak_room as *mut c_void,
301 Self::on_did_disconnect,
302 Self::on_did_subscribe_to_remote_video_track,
303 Self::on_did_unsubscribe_from_remote_video_track,
304 )
305 };
306 Self {
307 native_delegate,
308 weak_room,
309 }
310 }
311
312 extern "C" fn on_did_disconnect(room: *mut c_void) {
313 let room = unsafe { Weak::from_raw(room as *mut Room) };
314 if let Some(room) = room.upgrade() {
315 room.did_disconnect();
316 }
317 let _ = Weak::into_raw(room);
318 }
319
320 extern "C" fn on_did_subscribe_to_remote_video_track(
321 room: *mut c_void,
322 publisher_id: CFStringRef,
323 track_id: CFStringRef,
324 track: *const c_void,
325 ) {
326 let room = unsafe { Weak::from_raw(room as *mut Room) };
327 let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
328 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
329 let track = RemoteVideoTrack::new(track, track_id, publisher_id);
330 if let Some(room) = room.upgrade() {
331 room.did_subscribe_to_remote_video_track(track);
332 }
333 let _ = Weak::into_raw(room);
334 }
335
336 extern "C" fn on_did_unsubscribe_from_remote_video_track(
337 room: *mut c_void,
338 publisher_id: CFStringRef,
339 track_id: CFStringRef,
340 ) {
341 let room = unsafe { Weak::from_raw(room as *mut Room) };
342 let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
343 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
344 if let Some(room) = room.upgrade() {
345 room.did_unsubscribe_from_remote_video_track(publisher_id, track_id);
346 }
347 let _ = Weak::into_raw(room);
348 }
349}
350
351impl Drop for RoomDelegate {
352 fn drop(&mut self) {
353 unsafe {
354 CFRelease(self.native_delegate);
355 let _ = Weak::from_raw(self.weak_room);
356 }
357 }
358}
359
360pub struct LocalVideoTrack(*const c_void);
361
362impl LocalVideoTrack {
363 pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
364 Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) })
365 }
366}
367
368impl Drop for LocalVideoTrack {
369 fn drop(&mut self) {
370 unsafe { CFRelease(self.0) }
371 }
372}
373
374pub struct LocalTrackPublication(*const c_void);
375
376impl Drop for LocalTrackPublication {
377 fn drop(&mut self) {
378 unsafe { CFRelease(self.0) }
379 }
380}
381
382#[derive(Debug)]
383pub struct RemoteVideoTrack {
384 native_track: *const c_void,
385 sid: Sid,
386 publisher_id: String,
387}
388
389impl RemoteVideoTrack {
390 fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
391 unsafe {
392 CFRetain(native_track);
393 }
394 Self {
395 native_track,
396 sid,
397 publisher_id,
398 }
399 }
400
401 pub fn sid(&self) -> &str {
402 &self.sid
403 }
404
405 pub fn publisher_id(&self) -> &str {
406 &self.publisher_id
407 }
408
409 pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
410 extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool {
411 unsafe {
412 let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
413 let buffer = CVImageBuffer::wrap_under_get_rule(frame);
414 let result = tx.try_broadcast(Frame(buffer));
415 let _ = Box::into_raw(tx);
416 match result {
417 Ok(_) => true,
418 Err(async_broadcast::TrySendError::Closed(_))
419 | Err(async_broadcast::TrySendError::Inactive(_)) => {
420 log::warn!("no active receiver for frame");
421 false
422 }
423 Err(async_broadcast::TrySendError::Full(_)) => {
424 log::warn!("skipping frame as receiver is not keeping up");
425 true
426 }
427 }
428 }
429 }
430
431 extern "C" fn on_drop(callback_data: *mut c_void) {
432 unsafe {
433 let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
434 }
435 }
436
437 let (tx, rx) = async_broadcast::broadcast(64);
438 unsafe {
439 let renderer = LKVideoRendererCreate(
440 Box::into_raw(Box::new(tx)) as *mut c_void,
441 on_frame,
442 on_drop,
443 );
444 LKVideoTrackAddRenderer(self.native_track, renderer);
445 rx
446 }
447 }
448}
449
450impl Drop for RemoteVideoTrack {
451 fn drop(&mut self) {
452 unsafe { CFRelease(self.native_track) }
453 }
454}
455
456pub enum RemoteVideoTrackUpdate {
457 Subscribed(Arc<RemoteVideoTrack>),
458 Unsubscribed { publisher_id: Sid, track_id: Sid },
459}
460
461pub struct MacOSDisplay(*const c_void);
462
463impl MacOSDisplay {
464 fn new(ptr: *const c_void) -> Self {
465 unsafe {
466 CFRetain(ptr);
467 }
468 Self(ptr)
469 }
470}
471
472impl Drop for MacOSDisplay {
473 fn drop(&mut self) {
474 unsafe { CFRelease(self.0) }
475 }
476}
477
478#[derive(Clone)]
479pub struct Frame(CVImageBuffer);
480
481impl Frame {
482 pub fn width(&self) -> usize {
483 self.0.width()
484 }
485
486 pub fn height(&self) -> usize {
487 self.0.height()
488 }
489
490 pub fn image(&self) -> CVImageBuffer {
491 self.0.clone()
492 }
493}