1use anyhow::{anyhow, Context, Result};
2use core_foundation::{
3 array::{CFArray, CFArrayRef},
4 base::{CFRelease, CFRetain, TCFType},
5 string::{CFString, CFStringRef},
6};
7use futures::{
8 channel::{mpsc, oneshot},
9 Future,
10};
11pub use media::core_video::CVImageBuffer;
12use media::core_video::CVImageBufferRef;
13use parking_lot::Mutex;
14use postage::watch;
15use std::{
16 ffi::c_void,
17 sync::{Arc, Weak},
18};
19
20extern "C" {
21 fn LKRoomDelegateCreate(
22 callback_data: *mut c_void,
23 on_did_disconnect: extern "C" fn(callback_data: *mut c_void),
24 on_did_subscribe_to_remote_audio_track: extern "C" fn(
25 callback_data: *mut c_void,
26 publisher_id: CFStringRef,
27 track_id: CFStringRef,
28 remote_track: *const c_void,
29 remote_publication: *const c_void,
30 ),
31 on_did_unsubscribe_from_remote_audio_track: extern "C" fn(
32 callback_data: *mut c_void,
33 publisher_id: CFStringRef,
34 track_id: CFStringRef,
35 ),
36 on_mute_changed_from_remote_audio_track: extern "C" fn(
37 callback_data: *mut c_void,
38 track_id: CFStringRef,
39 muted: bool,
40 ),
41 on_active_speakers_changed: extern "C" fn(
42 callback_data: *mut c_void,
43 participants: CFArrayRef,
44 ),
45 on_did_subscribe_to_remote_video_track: extern "C" fn(
46 callback_data: *mut c_void,
47 publisher_id: CFStringRef,
48 track_id: CFStringRef,
49 remote_track: *const c_void,
50 ),
51 on_did_unsubscribe_from_remote_video_track: extern "C" fn(
52 callback_data: *mut c_void,
53 publisher_id: CFStringRef,
54 track_id: CFStringRef,
55 ),
56 ) -> *const c_void;
57
58 fn LKRoomCreate(delegate: *const c_void) -> *const c_void;
59 fn LKRoomConnect(
60 room: *const c_void,
61 url: CFStringRef,
62 token: CFStringRef,
63 callback: extern "C" fn(*mut c_void, CFStringRef),
64 callback_data: *mut c_void,
65 );
66 fn LKRoomDisconnect(room: *const c_void);
67 fn LKRoomPublishVideoTrack(
68 room: *const c_void,
69 track: *const c_void,
70 callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
71 callback_data: *mut c_void,
72 );
73 fn LKRoomPublishAudioTrack(
74 room: *const c_void,
75 track: *const c_void,
76 callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
77 callback_data: *mut c_void,
78 );
79 fn LKRoomUnpublishTrack(room: *const c_void, publication: *const c_void);
80 fn LKRoomAudioTracksForRemoteParticipant(
81 room: *const c_void,
82 participant_id: CFStringRef,
83 ) -> CFArrayRef;
84
85 fn LKRoomAudioTrackPublicationsForRemoteParticipant(
86 room: *const c_void,
87 participant_id: CFStringRef,
88 ) -> CFArrayRef;
89
90 fn LKRoomVideoTracksForRemoteParticipant(
91 room: *const c_void,
92 participant_id: CFStringRef,
93 ) -> CFArrayRef;
94
95 fn LKVideoRendererCreate(
96 callback_data: *mut c_void,
97 on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool,
98 on_drop: extern "C" fn(callback_data: *mut c_void),
99 ) -> *const c_void;
100
101 fn LKRemoteAudioTrackGetSid(track: *const c_void) -> CFStringRef;
102 fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
103 fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef;
104
105 fn LKDisplaySources(
106 callback_data: *mut c_void,
107 callback: extern "C" fn(
108 callback_data: *mut c_void,
109 sources: CFArrayRef,
110 error: CFStringRef,
111 ),
112 );
113 fn LKCreateScreenShareTrackForDisplay(display: *const c_void) -> *const c_void;
114 fn LKLocalAudioTrackCreateTrack() -> *const c_void;
115
116 fn LKLocalTrackPublicationSetMute(
117 publication: *const c_void,
118 muted: bool,
119 on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef),
120 callback_data: *mut c_void,
121 );
122
123 fn LKRemoteTrackPublicationSetEnabled(
124 publication: *const c_void,
125 enabled: bool,
126 on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef),
127 callback_data: *mut c_void,
128 );
129
130 fn LKRemoteTrackPublicationIsMuted(publication: *const c_void) -> bool;
131 fn LKRemoteTrackPublicationGetSid(publication: *const c_void) -> CFStringRef;
132}
133
134pub type Sid = String;
135
136#[derive(Clone, Eq, PartialEq)]
137pub enum ConnectionState {
138 Disconnected,
139 Connected { url: String, token: String },
140}
141
142pub struct Room {
143 native_room: *const c_void,
144 connection: Mutex<(
145 watch::Sender<ConnectionState>,
146 watch::Receiver<ConnectionState>,
147 )>,
148 remote_audio_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteAudioTrackUpdate>>>,
149 remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
150 _delegate: RoomDelegate,
151}
152
153impl Room {
154 pub fn new() -> Arc<Self> {
155 Arc::new_cyclic(|weak_room| {
156 let delegate = RoomDelegate::new(weak_room.clone());
157 Self {
158 native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
159 connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)),
160 remote_audio_track_subscribers: Default::default(),
161 remote_video_track_subscribers: Default::default(),
162 _delegate: delegate,
163 }
164 })
165 }
166
167 pub fn status(&self) -> watch::Receiver<ConnectionState> {
168 self.connection.lock().1.clone()
169 }
170
171 pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
172 let url = CFString::new(url);
173 let token = CFString::new(token);
174 let (did_connect, tx, rx) = Self::build_done_callback();
175 unsafe {
176 LKRoomConnect(
177 self.native_room,
178 url.as_concrete_TypeRef(),
179 token.as_concrete_TypeRef(),
180 did_connect,
181 tx,
182 )
183 }
184
185 let this = self.clone();
186 let url = url.to_string();
187 let token = token.to_string();
188 async move {
189 rx.await.unwrap().context("error connecting to room")?;
190 *this.connection.lock().0.borrow_mut() = ConnectionState::Connected { url, token };
191 Ok(())
192 }
193 }
194
195 fn did_disconnect(&self) {
196 *self.connection.lock().0.borrow_mut() = ConnectionState::Disconnected;
197 }
198
199 pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
200 extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
201 unsafe {
202 let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
203
204 if sources.is_null() {
205 let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
206 } else {
207 let sources = CFArray::wrap_under_get_rule(sources)
208 .into_iter()
209 .map(|source| MacOSDisplay::new(*source))
210 .collect();
211
212 let _ = tx.send(Ok(sources));
213 }
214 }
215 }
216
217 let (tx, rx) = oneshot::channel();
218
219 unsafe {
220 LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
221 }
222
223 async move { rx.await.unwrap() }
224 }
225
226 pub fn publish_video_track(
227 self: &Arc<Self>,
228 track: &LocalVideoTrack,
229 ) -> impl Future<Output = Result<LocalTrackPublication>> {
230 let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
231 extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
232 let tx =
233 unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
234 if error.is_null() {
235 let _ = tx.send(Ok(LocalTrackPublication::new(publication)));
236 } else {
237 let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
238 let _ = tx.send(Err(anyhow!(error)));
239 }
240 }
241 unsafe {
242 LKRoomPublishVideoTrack(
243 self.native_room,
244 track.0,
245 callback,
246 Box::into_raw(Box::new(tx)) as *mut c_void,
247 );
248 }
249 async { rx.await.unwrap().context("error publishing video track") }
250 }
251
252 pub fn publish_audio_track(
253 self: &Arc<Self>,
254 track: &LocalAudioTrack,
255 ) -> impl Future<Output = Result<LocalTrackPublication>> {
256 let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
257 extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
258 let tx =
259 unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
260 if error.is_null() {
261 let _ = tx.send(Ok(LocalTrackPublication::new(publication)));
262 } else {
263 let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
264 let _ = tx.send(Err(anyhow!(error)));
265 }
266 }
267 unsafe {
268 LKRoomPublishAudioTrack(
269 self.native_room,
270 track.0,
271 callback,
272 Box::into_raw(Box::new(tx)) as *mut c_void,
273 );
274 }
275 async { rx.await.unwrap().context("error publishing audio track") }
276 }
277
278 pub fn unpublish_track(&self, publication: LocalTrackPublication) {
279 unsafe {
280 LKRoomUnpublishTrack(self.native_room, publication.0);
281 }
282 }
283
284 pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
285 unsafe {
286 let tracks = LKRoomVideoTracksForRemoteParticipant(
287 self.native_room,
288 CFString::new(participant_id).as_concrete_TypeRef(),
289 );
290
291 if tracks.is_null() {
292 Vec::new()
293 } else {
294 let tracks = CFArray::wrap_under_get_rule(tracks);
295 tracks
296 .into_iter()
297 .map(|native_track| {
298 let native_track = *native_track;
299 let id =
300 CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
301 .to_string();
302 Arc::new(RemoteVideoTrack::new(
303 native_track,
304 id,
305 participant_id.into(),
306 ))
307 })
308 .collect()
309 }
310 }
311 }
312
313 pub fn remote_audio_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
314 unsafe {
315 let tracks = LKRoomAudioTracksForRemoteParticipant(
316 self.native_room,
317 CFString::new(participant_id).as_concrete_TypeRef(),
318 );
319
320 if tracks.is_null() {
321 Vec::new()
322 } else {
323 let tracks = CFArray::wrap_under_get_rule(tracks);
324 tracks
325 .into_iter()
326 .map(|native_track| {
327 let native_track = *native_track;
328 let id =
329 CFString::wrap_under_get_rule(LKRemoteAudioTrackGetSid(native_track))
330 .to_string();
331 Arc::new(RemoteAudioTrack::new(
332 native_track,
333 id,
334 participant_id.into(),
335 ))
336 })
337 .collect()
338 }
339 }
340 }
341
342 pub fn remote_audio_track_publications(
343 &self,
344 participant_id: &str,
345 ) -> Vec<Arc<RemoteTrackPublication>> {
346 unsafe {
347 let tracks = LKRoomAudioTrackPublicationsForRemoteParticipant(
348 self.native_room,
349 CFString::new(participant_id).as_concrete_TypeRef(),
350 );
351
352 if tracks.is_null() {
353 Vec::new()
354 } else {
355 let tracks = CFArray::wrap_under_get_rule(tracks);
356 tracks
357 .into_iter()
358 .map(|native_track_publication| {
359 let native_track_publication = *native_track_publication;
360 Arc::new(RemoteTrackPublication::new(native_track_publication))
361 })
362 .collect()
363 }
364 }
365 }
366
367 pub fn remote_audio_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteAudioTrackUpdate> {
368 let (tx, rx) = mpsc::unbounded();
369 self.remote_audio_track_subscribers.lock().push(tx);
370 rx
371 }
372
373 pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
374 let (tx, rx) = mpsc::unbounded();
375 self.remote_video_track_subscribers.lock().push(tx);
376 rx
377 }
378
379 fn did_subscribe_to_remote_audio_track(
380 &self,
381 track: RemoteAudioTrack,
382 publication: RemoteTrackPublication,
383 ) {
384 let track = Arc::new(track);
385 let publication = Arc::new(publication);
386 self.remote_audio_track_subscribers.lock().retain(|tx| {
387 tx.unbounded_send(RemoteAudioTrackUpdate::Subscribed(
388 track.clone(),
389 publication.clone(),
390 ))
391 .is_ok()
392 });
393 }
394
395 fn did_unsubscribe_from_remote_audio_track(&self, publisher_id: String, track_id: String) {
396 self.remote_audio_track_subscribers.lock().retain(|tx| {
397 tx.unbounded_send(RemoteAudioTrackUpdate::Unsubscribed {
398 publisher_id: publisher_id.clone(),
399 track_id: track_id.clone(),
400 })
401 .is_ok()
402 });
403 }
404
405 fn mute_changed_from_remote_audio_track(&self, track_id: String, muted: bool) {
406 self.remote_audio_track_subscribers.lock().retain(|tx| {
407 tx.unbounded_send(RemoteAudioTrackUpdate::MuteChanged {
408 track_id: track_id.clone(),
409 muted,
410 })
411 .is_ok()
412 });
413 }
414
415 // A vec of publisher IDs
416 fn active_speakers_changed(&self, speakers: Vec<String>) {
417 self.remote_audio_track_subscribers
418 .lock()
419 .retain(move |tx| {
420 tx.unbounded_send(RemoteAudioTrackUpdate::ActiveSpeakersChanged {
421 speakers: speakers.clone(),
422 })
423 .is_ok()
424 });
425 }
426
427 fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
428 let track = Arc::new(track);
429 self.remote_video_track_subscribers.lock().retain(|tx| {
430 tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
431 .is_ok()
432 });
433 }
434
435 fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
436 self.remote_video_track_subscribers.lock().retain(|tx| {
437 tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
438 publisher_id: publisher_id.clone(),
439 track_id: track_id.clone(),
440 })
441 .is_ok()
442 });
443 }
444
445 fn build_done_callback() -> (
446 extern "C" fn(*mut c_void, CFStringRef),
447 *mut c_void,
448 oneshot::Receiver<Result<()>>,
449 ) {
450 let (tx, rx) = oneshot::channel();
451 extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
452 let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
453 if error.is_null() {
454 let _ = tx.send(Ok(()));
455 } else {
456 let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
457 let _ = tx.send(Err(anyhow!(error)));
458 }
459 }
460 (
461 done_callback,
462 Box::into_raw(Box::new(tx)) as *mut c_void,
463 rx,
464 )
465 }
466}
467
468impl Drop for Room {
469 fn drop(&mut self) {
470 unsafe {
471 LKRoomDisconnect(self.native_room);
472 CFRelease(self.native_room);
473 }
474 }
475}
476
477struct RoomDelegate {
478 native_delegate: *const c_void,
479 weak_room: *const Room,
480}
481
482impl RoomDelegate {
483 fn new(weak_room: Weak<Room>) -> Self {
484 let weak_room = Weak::into_raw(weak_room);
485 let native_delegate = unsafe {
486 LKRoomDelegateCreate(
487 weak_room as *mut c_void,
488 Self::on_did_disconnect,
489 Self::on_did_subscribe_to_remote_audio_track,
490 Self::on_did_unsubscribe_from_remote_audio_track,
491 Self::on_mute_change_from_remote_audio_track,
492 Self::on_active_speakers_changed,
493 Self::on_did_subscribe_to_remote_video_track,
494 Self::on_did_unsubscribe_from_remote_video_track,
495 )
496 };
497 Self {
498 native_delegate,
499 weak_room,
500 }
501 }
502
503 extern "C" fn on_did_disconnect(room: *mut c_void) {
504 let room = unsafe { Weak::from_raw(room as *mut Room) };
505 if let Some(room) = room.upgrade() {
506 room.did_disconnect();
507 }
508 let _ = Weak::into_raw(room);
509 }
510
511 extern "C" fn on_did_subscribe_to_remote_audio_track(
512 room: *mut c_void,
513 publisher_id: CFStringRef,
514 track_id: CFStringRef,
515 track: *const c_void,
516 publication: *const c_void,
517 ) {
518 let room = unsafe { Weak::from_raw(room as *mut Room) };
519 let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
520 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
521 let track = RemoteAudioTrack::new(track, track_id, publisher_id);
522 let publication = RemoteTrackPublication::new(publication);
523 if let Some(room) = room.upgrade() {
524 room.did_subscribe_to_remote_audio_track(track, publication);
525 }
526 let _ = Weak::into_raw(room);
527 }
528
529 extern "C" fn on_did_unsubscribe_from_remote_audio_track(
530 room: *mut c_void,
531 publisher_id: CFStringRef,
532 track_id: CFStringRef,
533 ) {
534 let room = unsafe { Weak::from_raw(room as *mut Room) };
535 let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
536 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
537 if let Some(room) = room.upgrade() {
538 room.did_unsubscribe_from_remote_audio_track(publisher_id, track_id);
539 }
540 let _ = Weak::into_raw(room);
541 }
542
543 extern "C" fn on_mute_change_from_remote_audio_track(
544 room: *mut c_void,
545 track_id: CFStringRef,
546 muted: bool,
547 ) {
548 let room = unsafe { Weak::from_raw(room as *mut Room) };
549 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
550 if let Some(room) = room.upgrade() {
551 room.mute_changed_from_remote_audio_track(track_id, muted);
552 }
553 let _ = Weak::into_raw(room);
554 }
555
556 extern "C" fn on_active_speakers_changed(room: *mut c_void, participants: CFArrayRef) {
557 if participants.is_null() {
558 return;
559 }
560
561 let room = unsafe { Weak::from_raw(room as *mut Room) };
562 let speakers = unsafe {
563 CFArray::wrap_under_get_rule(participants)
564 .into_iter()
565 .map(
566 |speaker: core_foundation::base::ItemRef<'_, *const c_void>| {
567 CFString::wrap_under_get_rule(*speaker as CFStringRef).to_string()
568 },
569 )
570 .collect()
571 };
572
573 if let Some(room) = room.upgrade() {
574 room.active_speakers_changed(speakers);
575 }
576 let _ = Weak::into_raw(room);
577 }
578
579 extern "C" fn on_did_subscribe_to_remote_video_track(
580 room: *mut c_void,
581 publisher_id: CFStringRef,
582 track_id: CFStringRef,
583 track: *const c_void,
584 ) {
585 let room = unsafe { Weak::from_raw(room as *mut Room) };
586 let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
587 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
588 let track = RemoteVideoTrack::new(track, track_id, publisher_id);
589 if let Some(room) = room.upgrade() {
590 room.did_subscribe_to_remote_video_track(track);
591 }
592 let _ = Weak::into_raw(room);
593 }
594
595 extern "C" fn on_did_unsubscribe_from_remote_video_track(
596 room: *mut c_void,
597 publisher_id: CFStringRef,
598 track_id: CFStringRef,
599 ) {
600 let room = unsafe { Weak::from_raw(room as *mut Room) };
601 let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
602 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
603 if let Some(room) = room.upgrade() {
604 room.did_unsubscribe_from_remote_video_track(publisher_id, track_id);
605 }
606 let _ = Weak::into_raw(room);
607 }
608}
609
610impl Drop for RoomDelegate {
611 fn drop(&mut self) {
612 unsafe {
613 CFRelease(self.native_delegate);
614 let _ = Weak::from_raw(self.weak_room);
615 }
616 }
617}
618
619pub struct LocalAudioTrack(*const c_void);
620
621impl LocalAudioTrack {
622 pub fn create() -> Self {
623 Self(unsafe { LKLocalAudioTrackCreateTrack() })
624 }
625}
626
627impl Drop for LocalAudioTrack {
628 fn drop(&mut self) {
629 unsafe { CFRelease(self.0) }
630 }
631}
632
633pub struct LocalVideoTrack(*const c_void);
634
635impl LocalVideoTrack {
636 pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
637 Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) })
638 }
639}
640
641impl Drop for LocalVideoTrack {
642 fn drop(&mut self) {
643 unsafe { CFRelease(self.0) }
644 }
645}
646
647pub struct LocalTrackPublication(*const c_void);
648
649impl LocalTrackPublication {
650 pub fn new(native_track_publication: *const c_void) -> Self {
651 unsafe {
652 CFRetain(native_track_publication);
653 }
654 Self(native_track_publication)
655 }
656
657 pub fn set_mute(&self, muted: bool) -> impl Future<Output = Result<()>> {
658 let (tx, rx) = futures::channel::oneshot::channel();
659
660 extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) {
661 let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender<Result<()>>) };
662 if error.is_null() {
663 tx.send(Ok(())).ok();
664 } else {
665 let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
666 tx.send(Err(anyhow!(error))).ok();
667 }
668 }
669
670 unsafe {
671 LKLocalTrackPublicationSetMute(
672 self.0,
673 muted,
674 complete_callback,
675 Box::into_raw(Box::new(tx)) as *mut c_void,
676 )
677 }
678
679 async move { rx.await.unwrap() }
680 }
681}
682
683impl Drop for LocalTrackPublication {
684 fn drop(&mut self) {
685 unsafe { CFRelease(self.0) }
686 }
687}
688
689pub struct RemoteTrackPublication(*const c_void);
690
691impl RemoteTrackPublication {
692 pub fn new(native_track_publication: *const c_void) -> Self {
693 unsafe {
694 CFRetain(native_track_publication);
695 }
696 Self(native_track_publication)
697 }
698
699 pub fn sid(&self) -> String {
700 unsafe { CFString::wrap_under_get_rule(LKRemoteTrackPublicationGetSid(self.0)).to_string() }
701 }
702
703 pub fn is_muted(&self) -> bool {
704 unsafe { LKRemoteTrackPublicationIsMuted(self.0) }
705 }
706
707 pub fn set_enabled(&self, enabled: bool) -> impl Future<Output = Result<()>> {
708 let (tx, rx) = futures::channel::oneshot::channel();
709
710 extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) {
711 let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender<Result<()>>) };
712 if error.is_null() {
713 tx.send(Ok(())).ok();
714 } else {
715 let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
716 tx.send(Err(anyhow!(error))).ok();
717 }
718 }
719
720 unsafe {
721 LKRemoteTrackPublicationSetEnabled(
722 self.0,
723 enabled,
724 complete_callback,
725 Box::into_raw(Box::new(tx)) as *mut c_void,
726 )
727 }
728
729 async move { rx.await.unwrap() }
730 }
731}
732
733impl Drop for RemoteTrackPublication {
734 fn drop(&mut self) {
735 unsafe { CFRelease(self.0) }
736 }
737}
738
739#[derive(Debug)]
740pub struct RemoteAudioTrack {
741 _native_track: *const c_void,
742 sid: Sid,
743 publisher_id: String,
744}
745
746impl RemoteAudioTrack {
747 fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
748 unsafe {
749 CFRetain(native_track);
750 }
751 Self {
752 _native_track: native_track,
753 sid,
754 publisher_id,
755 }
756 }
757
758 pub fn sid(&self) -> &str {
759 &self.sid
760 }
761
762 pub fn publisher_id(&self) -> &str {
763 &self.publisher_id
764 }
765
766 pub fn enable(&self) -> impl Future<Output = Result<()>> {
767 async { Ok(()) }
768 }
769
770 pub fn disable(&self) -> impl Future<Output = Result<()>> {
771 async { Ok(()) }
772 }
773}
774
775#[derive(Debug)]
776pub struct RemoteVideoTrack {
777 native_track: *const c_void,
778 sid: Sid,
779 publisher_id: String,
780}
781
782impl RemoteVideoTrack {
783 fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
784 unsafe {
785 CFRetain(native_track);
786 }
787 Self {
788 native_track,
789 sid,
790 publisher_id,
791 }
792 }
793
794 pub fn sid(&self) -> &str {
795 &self.sid
796 }
797
798 pub fn publisher_id(&self) -> &str {
799 &self.publisher_id
800 }
801
802 pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
803 extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool {
804 unsafe {
805 let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
806 let buffer = CVImageBuffer::wrap_under_get_rule(frame);
807 let result = tx.try_broadcast(Frame(buffer));
808 let _ = Box::into_raw(tx);
809 match result {
810 Ok(_) => true,
811 Err(async_broadcast::TrySendError::Closed(_))
812 | Err(async_broadcast::TrySendError::Inactive(_)) => {
813 log::warn!("no active receiver for frame");
814 false
815 }
816 Err(async_broadcast::TrySendError::Full(_)) => {
817 log::warn!("skipping frame as receiver is not keeping up");
818 true
819 }
820 }
821 }
822 }
823
824 extern "C" fn on_drop(callback_data: *mut c_void) {
825 unsafe {
826 let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
827 }
828 }
829
830 let (tx, rx) = async_broadcast::broadcast(64);
831 unsafe {
832 let renderer = LKVideoRendererCreate(
833 Box::into_raw(Box::new(tx)) as *mut c_void,
834 on_frame,
835 on_drop,
836 );
837 LKVideoTrackAddRenderer(self.native_track, renderer);
838 rx
839 }
840 }
841}
842
843impl Drop for RemoteVideoTrack {
844 fn drop(&mut self) {
845 unsafe { CFRelease(self.native_track) }
846 }
847}
848
849pub enum RemoteVideoTrackUpdate {
850 Subscribed(Arc<RemoteVideoTrack>),
851 Unsubscribed { publisher_id: Sid, track_id: Sid },
852}
853
854pub enum RemoteAudioTrackUpdate {
855 ActiveSpeakersChanged { speakers: Vec<Sid> },
856 MuteChanged { track_id: Sid, muted: bool },
857 Subscribed(Arc<RemoteAudioTrack>, Arc<RemoteTrackPublication>),
858 Unsubscribed { publisher_id: Sid, track_id: Sid },
859}
860
861pub struct MacOSDisplay(*const c_void);
862
863impl MacOSDisplay {
864 fn new(ptr: *const c_void) -> Self {
865 unsafe {
866 CFRetain(ptr);
867 }
868 Self(ptr)
869 }
870}
871
872impl Drop for MacOSDisplay {
873 fn drop(&mut self) {
874 unsafe { CFRelease(self.0) }
875 }
876}
877
878#[derive(Clone)]
879pub struct Frame(CVImageBuffer);
880
881impl Frame {
882 pub fn width(&self) -> usize {
883 self.0.width()
884 }
885
886 pub fn height(&self) -> usize {
887 self.0.height()
888 }
889
890 pub fn image(&self) -> CVImageBuffer {
891 self.0.clone()
892 }
893}