1use anyhow::{anyhow, Context, Result};
2use core_foundation::{
3 array::{CFArray, CFArrayRef},
4 base::{CFRelease, CFRetain, TCFType},
5 string::{CFString, CFStringRef},
6};
7use futures::{
8 channel::{mpsc, oneshot},
9 Future,
10};
11pub use media::core_video::CVImageBuffer;
12use media::core_video::CVImageBufferRef;
13use parking_lot::Mutex;
14use postage::watch;
15use std::{
16 ffi::c_void,
17 sync::{Arc, Weak},
18};
19
20extern "C" {
21 fn LKRoomDelegateCreate(
22 callback_data: *mut c_void,
23 on_did_disconnect: extern "C" fn(callback_data: *mut c_void),
24 on_did_subscribe_to_remote_audio_track: extern "C" fn(
25 callback_data: *mut c_void,
26 publisher_id: CFStringRef,
27 track_id: CFStringRef,
28 remote_track: *const c_void,
29 ),
30 on_did_unsubscribe_from_remote_audio_track: extern "C" fn(
31 callback_data: *mut c_void,
32 publisher_id: CFStringRef,
33 track_id: CFStringRef,
34 ),
35 on_did_subscribe_to_remote_video_track: extern "C" fn(
36 callback_data: *mut c_void,
37 publisher_id: CFStringRef,
38 track_id: CFStringRef,
39 remote_track: *const c_void,
40 ),
41 on_did_unsubscribe_from_remote_video_track: extern "C" fn(
42 callback_data: *mut c_void,
43 publisher_id: CFStringRef,
44 track_id: CFStringRef,
45 ),
46 ) -> *const c_void;
47
48 fn LKRoomCreate(delegate: *const c_void) -> *const c_void;
49 fn LKRoomConnect(
50 room: *const c_void,
51 url: CFStringRef,
52 token: CFStringRef,
53 callback: extern "C" fn(*mut c_void, CFStringRef),
54 callback_data: *mut c_void,
55 );
56 fn LKRoomDisconnect(room: *const c_void);
57 fn LKRoomPublishVideoTrack(
58 room: *const c_void,
59 track: *const c_void,
60 callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
61 callback_data: *mut c_void,
62 );
63 fn LKRoomPublishAudioTrack(
64 room: *const c_void,
65 track: *const c_void,
66 callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
67 callback_data: *mut c_void,
68 );
69 fn LKRoomUnpublishTrack(room: *const c_void, publication: *const c_void);
70 fn LKRoomAudioTracksForRemoteParticipant(
71 room: *const c_void,
72 participant_id: CFStringRef,
73 ) -> CFArrayRef;
74
75 fn LKRoomVideoTracksForRemoteParticipant(
76 room: *const c_void,
77 participant_id: CFStringRef,
78 ) -> CFArrayRef;
79
80 fn LKVideoRendererCreate(
81 callback_data: *mut c_void,
82 on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool,
83 on_drop: extern "C" fn(callback_data: *mut c_void),
84 ) -> *const c_void;
85
86 fn LKRemoteAudioTrackGetSid(track: *const c_void) -> CFStringRef;
87 // fn LKRemoteAudioTrackStart(
88 // track: *const c_void,
89 // callback: extern "C" fn(*mut c_void, bool),
90 // callback_data: *mut c_void
91 // );
92
93 fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
94 fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef;
95
96 fn LKDisplaySources(
97 callback_data: *mut c_void,
98 callback: extern "C" fn(
99 callback_data: *mut c_void,
100 sources: CFArrayRef,
101 error: CFStringRef,
102 ),
103 );
104 fn LKCreateScreenShareTrackForDisplay(display: *const c_void) -> *const c_void;
105 fn LKLocalAudioTrackCreateTrack() -> *const c_void;
106}
107
108pub type Sid = String;
109
110#[derive(Clone, Eq, PartialEq)]
111pub enum ConnectionState {
112 Disconnected,
113 Connected { url: String, token: String },
114}
115
116pub struct Room {
117 native_room: *const c_void,
118 connection: Mutex<(
119 watch::Sender<ConnectionState>,
120 watch::Receiver<ConnectionState>,
121 )>,
122 remote_audio_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteAudioTrackUpdate>>>,
123 remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
124 _delegate: RoomDelegate,
125}
126
127impl Room {
128 pub fn new() -> Arc<Self> {
129 Arc::new_cyclic(|weak_room| {
130 let delegate = RoomDelegate::new(weak_room.clone());
131 Self {
132 native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
133 connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)),
134 remote_audio_track_subscribers: Default::default(),
135 remote_video_track_subscribers: Default::default(),
136 _delegate: delegate,
137 }
138 })
139 }
140
141 pub fn status(&self) -> watch::Receiver<ConnectionState> {
142 self.connection.lock().1.clone()
143 }
144
145 pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
146 let url = CFString::new(url);
147 let token = CFString::new(token);
148 let (did_connect, tx, rx) = Self::build_done_callback();
149 unsafe {
150 LKRoomConnect(
151 self.native_room,
152 url.as_concrete_TypeRef(),
153 token.as_concrete_TypeRef(),
154 did_connect,
155 tx,
156 )
157 }
158
159 let this = self.clone();
160 let url = url.to_string();
161 let token = token.to_string();
162 async move {
163 rx.await.unwrap().context("error connecting to room")?;
164 *this.connection.lock().0.borrow_mut() = ConnectionState::Connected { url, token };
165 Ok(())
166 }
167 }
168
169 fn did_disconnect(&self) {
170 *self.connection.lock().0.borrow_mut() = ConnectionState::Disconnected;
171 }
172
173 pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
174 extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
175 unsafe {
176 let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
177
178 if sources.is_null() {
179 let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
180 } else {
181 let sources = CFArray::wrap_under_get_rule(sources)
182 .into_iter()
183 .map(|source| MacOSDisplay::new(*source))
184 .collect();
185
186 let _ = tx.send(Ok(sources));
187 }
188 }
189 }
190
191 let (tx, rx) = oneshot::channel();
192
193 unsafe {
194 LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
195 }
196
197 async move { rx.await.unwrap() }
198 }
199
200 pub fn publish_video_track(
201 self: &Arc<Self>,
202 track: &LocalVideoTrack,
203 ) -> impl Future<Output = Result<LocalTrackPublication>> {
204 let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
205 extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
206 let tx =
207 unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
208 if error.is_null() {
209 let _ = tx.send(Ok(LocalTrackPublication(publication)));
210 } else {
211 let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
212 let _ = tx.send(Err(anyhow!(error)));
213 }
214 }
215 unsafe {
216 LKRoomPublishVideoTrack(
217 self.native_room,
218 track.0,
219 callback,
220 Box::into_raw(Box::new(tx)) as *mut c_void,
221 );
222 }
223 async { rx.await.unwrap().context("error publishing video track") }
224 }
225
226 pub fn publish_audio_track(
227 self: &Arc<Self>,
228 track: &LocalAudioTrack,
229 ) -> impl Future<Output = Result<LocalTrackPublication>> {
230 let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
231 extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
232 let tx =
233 unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
234 if error.is_null() {
235 let _ = tx.send(Ok(LocalTrackPublication(publication)));
236 } else {
237 let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
238 let _ = tx.send(Err(anyhow!(error)));
239 }
240 }
241 unsafe {
242 LKRoomPublishAudioTrack(
243 self.native_room,
244 track.0,
245 callback,
246 Box::into_raw(Box::new(tx)) as *mut c_void,
247 );
248 }
249 async { rx.await.unwrap().context("error publishing video track") }
250 }
251
252 pub fn unpublish_track(&self, publication: LocalTrackPublication) {
253 unsafe {
254 LKRoomUnpublishTrack(self.native_room, publication.0);
255 }
256 }
257
258 pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
259 unsafe {
260 let tracks = LKRoomVideoTracksForRemoteParticipant(
261 self.native_room,
262 CFString::new(participant_id).as_concrete_TypeRef(),
263 );
264
265 if tracks.is_null() {
266 Vec::new()
267 } else {
268 let tracks = CFArray::wrap_under_get_rule(tracks);
269 tracks
270 .into_iter()
271 .map(|native_track| {
272 let native_track = *native_track;
273 let id =
274 CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
275 .to_string();
276 Arc::new(RemoteVideoTrack::new(
277 native_track,
278 id,
279 participant_id.into(),
280 ))
281 })
282 .collect()
283 }
284 }
285 }
286
287 pub fn remote_audio_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
288 unsafe {
289 let tracks = LKRoomAudioTracksForRemoteParticipant(
290 self.native_room,
291 CFString::new(participant_id).as_concrete_TypeRef(),
292 );
293
294 if tracks.is_null() {
295 Vec::new()
296 } else {
297 let tracks = CFArray::wrap_under_get_rule(tracks);
298 tracks
299 .into_iter()
300 .map(|native_track| {
301 let native_track = *native_track;
302 let id =
303 CFString::wrap_under_get_rule(LKRemoteAudioTrackGetSid(native_track))
304 .to_string();
305 Arc::new(RemoteAudioTrack::new(
306 native_track,
307 id,
308 participant_id.into(),
309 ))
310 })
311 .collect()
312 }
313 }
314 }
315
316 pub fn remote_audio_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteAudioTrackUpdate> {
317 let (tx, rx) = mpsc::unbounded();
318 self.remote_audio_track_subscribers.lock().push(tx);
319 rx
320 }
321
322 pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
323 let (tx, rx) = mpsc::unbounded();
324 self.remote_video_track_subscribers.lock().push(tx);
325 rx
326 }
327
328 fn did_subscribe_to_remote_audio_track(&self, track: RemoteAudioTrack) {
329 let track = Arc::new(track);
330 self.remote_audio_track_subscribers.lock().retain(|tx| {
331 tx.unbounded_send(RemoteAudioTrackUpdate::Subscribed(track.clone()))
332 .is_ok()
333 });
334 }
335
336 fn did_unsubscribe_from_remote_audio_track(&self, publisher_id: String, track_id: String) {
337 self.remote_audio_track_subscribers.lock().retain(|tx| {
338 tx.unbounded_send(RemoteAudioTrackUpdate::Unsubscribed {
339 publisher_id: publisher_id.clone(),
340 track_id: track_id.clone(),
341 })
342 .is_ok()
343 });
344 }
345
346 fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
347 let track = Arc::new(track);
348 self.remote_video_track_subscribers.lock().retain(|tx| {
349 tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
350 .is_ok()
351 });
352 }
353
354 fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
355 self.remote_video_track_subscribers.lock().retain(|tx| {
356 tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
357 publisher_id: publisher_id.clone(),
358 track_id: track_id.clone(),
359 })
360 .is_ok()
361 });
362 }
363
364 fn build_done_callback() -> (
365 extern "C" fn(*mut c_void, CFStringRef),
366 *mut c_void,
367 oneshot::Receiver<Result<()>>,
368 ) {
369 let (tx, rx) = oneshot::channel();
370 extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
371 let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
372 if error.is_null() {
373 let _ = tx.send(Ok(()));
374 } else {
375 let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
376 let _ = tx.send(Err(anyhow!(error)));
377 }
378 }
379 (
380 done_callback,
381 Box::into_raw(Box::new(tx)) as *mut c_void,
382 rx,
383 )
384 }
385}
386
387impl Drop for Room {
388 fn drop(&mut self) {
389 unsafe {
390 LKRoomDisconnect(self.native_room);
391 CFRelease(self.native_room);
392 }
393 }
394}
395
396struct RoomDelegate {
397 native_delegate: *const c_void,
398 weak_room: *const Room,
399}
400
401impl RoomDelegate {
402 fn new(weak_room: Weak<Room>) -> Self {
403 let weak_room = Weak::into_raw(weak_room);
404 let native_delegate = unsafe {
405 LKRoomDelegateCreate(
406 weak_room as *mut c_void,
407 Self::on_did_disconnect,
408 Self::on_did_subscribe_to_remote_audio_track,
409 Self::on_did_unsubscribe_from_remote_audio_track,
410 Self::on_did_subscribe_to_remote_video_track,
411 Self::on_did_unsubscribe_from_remote_video_track,
412 )
413 };
414 Self {
415 native_delegate,
416 weak_room,
417 }
418 }
419
420 extern "C" fn on_did_disconnect(room: *mut c_void) {
421 let room = unsafe { Weak::from_raw(room as *mut Room) };
422 if let Some(room) = room.upgrade() {
423 room.did_disconnect();
424 }
425 let _ = Weak::into_raw(room);
426 }
427
428 extern "C" fn on_did_subscribe_to_remote_audio_track(
429 room: *mut c_void,
430 publisher_id: CFStringRef,
431 track_id: CFStringRef,
432 track: *const c_void,
433 ) {
434 let room = unsafe { Weak::from_raw(room as *mut Room) };
435 let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
436 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
437 let track = RemoteAudioTrack::new(track, track_id, publisher_id);
438 if let Some(room) = room.upgrade() {
439 room.did_subscribe_to_remote_audio_track(track);
440 }
441 let _ = Weak::into_raw(room);
442 }
443
444 extern "C" fn on_did_unsubscribe_from_remote_audio_track(
445 room: *mut c_void,
446 publisher_id: CFStringRef,
447 track_id: CFStringRef,
448 ) {
449 let room = unsafe { Weak::from_raw(room as *mut Room) };
450 let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
451 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
452 if let Some(room) = room.upgrade() {
453 room.did_unsubscribe_from_remote_audio_track(publisher_id, track_id);
454 }
455 let _ = Weak::into_raw(room);
456 }
457
458 extern "C" fn on_did_subscribe_to_remote_video_track(
459 room: *mut c_void,
460 publisher_id: CFStringRef,
461 track_id: CFStringRef,
462 track: *const c_void,
463 ) {
464 let room = unsafe { Weak::from_raw(room as *mut Room) };
465 let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
466 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
467 let track = RemoteVideoTrack::new(track, track_id, publisher_id);
468 if let Some(room) = room.upgrade() {
469 room.did_subscribe_to_remote_video_track(track);
470 }
471 let _ = Weak::into_raw(room);
472 }
473
474 extern "C" fn on_did_unsubscribe_from_remote_video_track(
475 room: *mut c_void,
476 publisher_id: CFStringRef,
477 track_id: CFStringRef,
478 ) {
479 let room = unsafe { Weak::from_raw(room as *mut Room) };
480 let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
481 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
482 if let Some(room) = room.upgrade() {
483 room.did_unsubscribe_from_remote_video_track(publisher_id, track_id);
484 }
485 let _ = Weak::into_raw(room);
486 }
487}
488
489impl Drop for RoomDelegate {
490 fn drop(&mut self) {
491 unsafe {
492 CFRelease(self.native_delegate);
493 let _ = Weak::from_raw(self.weak_room);
494 }
495 }
496}
497
498pub struct LocalAudioTrack(*const c_void);
499
500impl LocalAudioTrack {
501 pub fn create() -> Self {
502 Self(unsafe { LKLocalAudioTrackCreateTrack() })
503 }
504}
505
506impl Drop for LocalAudioTrack {
507 fn drop(&mut self) {
508 unsafe { CFRelease(self.0) }
509 }
510}
511
512pub struct LocalVideoTrack(*const c_void);
513
514impl LocalVideoTrack {
515 pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
516 Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) })
517 }
518}
519
520impl Drop for LocalVideoTrack {
521 fn drop(&mut self) {
522 unsafe { CFRelease(self.0) }
523 }
524}
525
526pub struct LocalTrackPublication(*const c_void);
527
528impl Drop for LocalTrackPublication {
529 fn drop(&mut self) {
530 unsafe { CFRelease(self.0) }
531 }
532}
533
534#[derive(Debug)]
535pub struct RemoteAudioTrack {
536 _native_track: *const c_void,
537 sid: Sid,
538 publisher_id: String,
539}
540
541impl RemoteAudioTrack {
542 fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
543 unsafe {
544 CFRetain(native_track);
545 }
546 Self {
547 _native_track: native_track,
548 sid,
549 publisher_id,
550 }
551 }
552
553 pub fn sid(&self) -> &str {
554 &self.sid
555 }
556
557 pub fn publisher_id(&self) -> &str {
558 &self.publisher_id
559 }
560}
561
562#[derive(Debug)]
563pub struct RemoteVideoTrack {
564 native_track: *const c_void,
565 sid: Sid,
566 publisher_id: String,
567}
568
569impl RemoteVideoTrack {
570 fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
571 unsafe {
572 CFRetain(native_track);
573 }
574 Self {
575 native_track,
576 sid,
577 publisher_id,
578 }
579 }
580
581 pub fn sid(&self) -> &str {
582 &self.sid
583 }
584
585 pub fn publisher_id(&self) -> &str {
586 &self.publisher_id
587 }
588
589 pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
590 extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool {
591 unsafe {
592 let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
593 let buffer = CVImageBuffer::wrap_under_get_rule(frame);
594 let result = tx.try_broadcast(Frame(buffer));
595 let _ = Box::into_raw(tx);
596 match result {
597 Ok(_) => true,
598 Err(async_broadcast::TrySendError::Closed(_))
599 | Err(async_broadcast::TrySendError::Inactive(_)) => {
600 log::warn!("no active receiver for frame");
601 false
602 }
603 Err(async_broadcast::TrySendError::Full(_)) => {
604 log::warn!("skipping frame as receiver is not keeping up");
605 true
606 }
607 }
608 }
609 }
610
611 extern "C" fn on_drop(callback_data: *mut c_void) {
612 unsafe {
613 let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
614 }
615 }
616
617 let (tx, rx) = async_broadcast::broadcast(64);
618 unsafe {
619 let renderer = LKVideoRendererCreate(
620 Box::into_raw(Box::new(tx)) as *mut c_void,
621 on_frame,
622 on_drop,
623 );
624 LKVideoTrackAddRenderer(self.native_track, renderer);
625 rx
626 }
627 }
628}
629
630impl Drop for RemoteVideoTrack {
631 fn drop(&mut self) {
632 unsafe { CFRelease(self.native_track) }
633 }
634}
635
636pub enum RemoteVideoTrackUpdate {
637 Subscribed(Arc<RemoteVideoTrack>),
638 Unsubscribed { publisher_id: Sid, track_id: Sid },
639}
640
641pub enum RemoteAudioTrackUpdate {
642 Subscribed(Arc<RemoteAudioTrack>),
643 Unsubscribed { publisher_id: Sid, track_id: Sid },
644}
645
646pub struct MacOSDisplay(*const c_void);
647
648impl MacOSDisplay {
649 fn new(ptr: *const c_void) -> Self {
650 unsafe {
651 CFRetain(ptr);
652 }
653 Self(ptr)
654 }
655}
656
657impl Drop for MacOSDisplay {
658 fn drop(&mut self) {
659 unsafe { CFRelease(self.0) }
660 }
661}
662
663#[derive(Clone)]
664pub struct Frame(CVImageBuffer);
665
666impl Frame {
667 pub fn width(&self) -> usize {
668 self.0.width()
669 }
670
671 pub fn height(&self) -> usize {
672 self.0.height()
673 }
674
675 pub fn image(&self) -> CVImageBuffer {
676 self.0.clone()
677 }
678}