1use anyhow::{anyhow, Context, Result};
2use core_foundation::{
3 array::{CFArray, CFArrayRef},
4 base::{CFRelease, CFRetain, TCFType},
5 string::{CFString, CFStringRef},
6};
7use futures::{
8 channel::{mpsc, oneshot},
9 Future,
10};
11pub use media::core_video::CVImageBuffer;
12use media::core_video::CVImageBufferRef;
13use parking_lot::Mutex;
14use postage::watch;
15use std::{
16 ffi::c_void,
17 sync::{Arc, Weak},
18};
19
20extern "C" {
21 fn LKRoomDelegateCreate(
22 callback_data: *mut c_void,
23 on_did_disconnect: extern "C" fn(callback_data: *mut c_void),
24 on_did_subscribe_to_remote_video_track: extern "C" fn(
25 callback_data: *mut c_void,
26 publisher_id: CFStringRef,
27 track_id: CFStringRef,
28 remote_track: *const c_void,
29 ),
30 on_did_unsubscribe_from_remote_video_track: extern "C" fn(
31 callback_data: *mut c_void,
32 publisher_id: CFStringRef,
33 track_id: CFStringRef,
34 ),
35 ) -> *const c_void;
36
37 fn LKRoomCreate(delegate: *const c_void) -> *const c_void;
38 fn LKRoomConnect(
39 room: *const c_void,
40 url: CFStringRef,
41 token: CFStringRef,
42 callback: extern "C" fn(*mut c_void, CFStringRef),
43 callback_data: *mut c_void,
44 );
45 fn LKRoomDisconnect(room: *const c_void);
46 fn LKRoomPublishVideoTrack(
47 room: *const c_void,
48 track: *const c_void,
49 callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
50 callback_data: *mut c_void,
51 );
52 fn LKRoomUnpublishTrack(room: *const c_void, publication: *const c_void);
53 fn LKRoomVideoTracksForRemoteParticipant(
54 room: *const c_void,
55 participant_id: CFStringRef,
56 ) -> CFArrayRef;
57
58 fn LKVideoRendererCreate(
59 callback_data: *mut c_void,
60 on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool,
61 on_drop: extern "C" fn(callback_data: *mut c_void),
62 ) -> *const c_void;
63
64 fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
65 fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef;
66
67 fn LKDisplaySources(
68 callback_data: *mut c_void,
69 callback: extern "C" fn(
70 callback_data: *mut c_void,
71 sources: CFArrayRef,
72 error: CFStringRef,
73 ),
74 );
75 fn LKCreateScreenShareTrackForDisplay(display: *const c_void) -> *const c_void;
76}
77
78pub type Sid = String;
79
80#[derive(Clone, Eq, PartialEq)]
81pub enum ConnectionState {
82 Disconnected,
83 Connected { url: String, token: String },
84}
85
86pub struct Room {
87 native_room: *const c_void,
88 connection: Mutex<(
89 watch::Sender<ConnectionState>,
90 watch::Receiver<ConnectionState>,
91 )>,
92 remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
93 _delegate: RoomDelegate,
94}
95
96impl Room {
97 pub fn new() -> Arc<Self> {
98 Arc::new_cyclic(|weak_room| {
99 let delegate = RoomDelegate::new(weak_room.clone());
100 Self {
101 native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
102 connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)),
103 remote_video_track_subscribers: Default::default(),
104 _delegate: delegate,
105 }
106 })
107 }
108
109 pub fn status(&self) -> watch::Receiver<ConnectionState> {
110 self.connection.lock().1.clone()
111 }
112
113 pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
114 let url = CFString::new(url);
115 let token = CFString::new(token);
116 let (did_connect, tx, rx) = Self::build_done_callback();
117 unsafe {
118 LKRoomConnect(
119 self.native_room,
120 url.as_concrete_TypeRef(),
121 token.as_concrete_TypeRef(),
122 did_connect,
123 tx,
124 )
125 }
126
127 let this = self.clone();
128 let url = url.to_string();
129 let token = token.to_string();
130 async move {
131 rx.await.unwrap().context("error connecting to room")?;
132 *this.connection.lock().0.borrow_mut() = ConnectionState::Connected { url, token };
133 Ok(())
134 }
135 }
136
137 fn did_disconnect(&self) {
138 *self.connection.lock().0.borrow_mut() = ConnectionState::Disconnected;
139 }
140
141 pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
142 extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
143 unsafe {
144 let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
145
146 if sources.is_null() {
147 let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
148 } else {
149 let sources = CFArray::wrap_under_get_rule(sources)
150 .into_iter()
151 .map(|source| MacOSDisplay::new(*source))
152 .collect();
153
154 let _ = tx.send(Ok(sources));
155 }
156 }
157 }
158
159 let (tx, rx) = oneshot::channel();
160
161 unsafe {
162 LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
163 }
164
165 async move { rx.await.unwrap() }
166 }
167
168 pub fn publish_video_track(
169 self: &Arc<Self>,
170 track: &LocalVideoTrack,
171 ) -> impl Future<Output = Result<LocalTrackPublication>> {
172 let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
173 extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
174 let tx =
175 unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
176 if error.is_null() {
177 let _ = tx.send(Ok(LocalTrackPublication(publication)));
178 } else {
179 let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
180 let _ = tx.send(Err(anyhow!(error)));
181 }
182 }
183 unsafe {
184 LKRoomPublishVideoTrack(
185 self.native_room,
186 track.0,
187 callback,
188 Box::into_raw(Box::new(tx)) as *mut c_void,
189 );
190 }
191 async { rx.await.unwrap().context("error publishing video track") }
192 }
193
194 pub fn unpublish_track(&self, publication: LocalTrackPublication) {
195 unsafe {
196 LKRoomUnpublishTrack(self.native_room, publication.0);
197 }
198 }
199
200 pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
201 unsafe {
202 let tracks = LKRoomVideoTracksForRemoteParticipant(
203 self.native_room,
204 CFString::new(participant_id).as_concrete_TypeRef(),
205 );
206
207 if tracks.is_null() {
208 Vec::new()
209 } else {
210 let tracks = CFArray::wrap_under_get_rule(tracks);
211 tracks
212 .into_iter()
213 .map(|native_track| {
214 let native_track = *native_track;
215 let id =
216 CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
217 .to_string();
218 Arc::new(RemoteVideoTrack::new(
219 native_track,
220 id,
221 participant_id.into(),
222 ))
223 })
224 .collect()
225 }
226 }
227 }
228
229 pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
230 let (tx, rx) = mpsc::unbounded();
231 self.remote_video_track_subscribers.lock().push(tx);
232 rx
233 }
234
235 fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
236 let track = Arc::new(track);
237 self.remote_video_track_subscribers.lock().retain(|tx| {
238 tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
239 .is_ok()
240 });
241 }
242
243 fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
244 self.remote_video_track_subscribers.lock().retain(|tx| {
245 tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
246 publisher_id: publisher_id.clone(),
247 track_id: track_id.clone(),
248 })
249 .is_ok()
250 });
251 }
252
253 fn build_done_callback() -> (
254 extern "C" fn(*mut c_void, CFStringRef),
255 *mut c_void,
256 oneshot::Receiver<Result<()>>,
257 ) {
258 let (tx, rx) = oneshot::channel();
259 extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
260 let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
261 if error.is_null() {
262 let _ = tx.send(Ok(()));
263 } else {
264 let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
265 let _ = tx.send(Err(anyhow!(error)));
266 }
267 }
268 (
269 done_callback,
270 Box::into_raw(Box::new(tx)) as *mut c_void,
271 rx,
272 )
273 }
274}
275
276impl Drop for Room {
277 fn drop(&mut self) {
278 unsafe {
279 LKRoomDisconnect(self.native_room);
280 CFRelease(self.native_room);
281 }
282 }
283}
284
285struct RoomDelegate {
286 native_delegate: *const c_void,
287 weak_room: *const Room,
288}
289
290impl RoomDelegate {
291 fn new(weak_room: Weak<Room>) -> Self {
292 let weak_room = Weak::into_raw(weak_room);
293 let native_delegate = unsafe {
294 LKRoomDelegateCreate(
295 weak_room as *mut c_void,
296 Self::on_did_disconnect,
297 Self::on_did_subscribe_to_remote_video_track,
298 Self::on_did_unsubscribe_from_remote_video_track,
299 )
300 };
301 Self {
302 native_delegate,
303 weak_room,
304 }
305 }
306
307 extern "C" fn on_did_disconnect(room: *mut c_void) {
308 let room = unsafe { Weak::from_raw(room as *mut Room) };
309 if let Some(room) = room.upgrade() {
310 room.did_disconnect();
311 }
312 let _ = Weak::into_raw(room);
313 }
314
315 extern "C" fn on_did_subscribe_to_remote_video_track(
316 room: *mut c_void,
317 publisher_id: CFStringRef,
318 track_id: CFStringRef,
319 track: *const c_void,
320 ) {
321 let room = unsafe { Weak::from_raw(room as *mut Room) };
322 let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
323 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
324 let track = RemoteVideoTrack::new(track, track_id, publisher_id);
325 if let Some(room) = room.upgrade() {
326 room.did_subscribe_to_remote_video_track(track);
327 }
328 let _ = Weak::into_raw(room);
329 }
330
331 extern "C" fn on_did_unsubscribe_from_remote_video_track(
332 room: *mut c_void,
333 publisher_id: CFStringRef,
334 track_id: CFStringRef,
335 ) {
336 let room = unsafe { Weak::from_raw(room as *mut Room) };
337 let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
338 let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
339 if let Some(room) = room.upgrade() {
340 room.did_unsubscribe_from_remote_video_track(publisher_id, track_id);
341 }
342 let _ = Weak::into_raw(room);
343 }
344}
345
346impl Drop for RoomDelegate {
347 fn drop(&mut self) {
348 unsafe {
349 CFRelease(self.native_delegate);
350 let _ = Weak::from_raw(self.weak_room);
351 }
352 }
353}
354
355pub struct LocalVideoTrack(*const c_void);
356
357impl LocalVideoTrack {
358 pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
359 Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) })
360 }
361}
362
363impl Drop for LocalVideoTrack {
364 fn drop(&mut self) {
365 unsafe { CFRelease(self.0) }
366 }
367}
368
369pub struct LocalTrackPublication(*const c_void);
370
371impl Drop for LocalTrackPublication {
372 fn drop(&mut self) {
373 unsafe { CFRelease(self.0) }
374 }
375}
376
377#[derive(Debug)]
378pub struct RemoteVideoTrack {
379 native_track: *const c_void,
380 sid: Sid,
381 publisher_id: String,
382}
383
384impl RemoteVideoTrack {
385 fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
386 unsafe {
387 CFRetain(native_track);
388 }
389 Self {
390 native_track,
391 sid,
392 publisher_id,
393 }
394 }
395
396 pub fn sid(&self) -> &str {
397 &self.sid
398 }
399
400 pub fn publisher_id(&self) -> &str {
401 &self.publisher_id
402 }
403
404 pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
405 extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool {
406 unsafe {
407 let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
408 let buffer = CVImageBuffer::wrap_under_get_rule(frame);
409 let result = tx.try_broadcast(Frame(buffer));
410 let _ = Box::into_raw(tx);
411 match result {
412 Ok(_) => true,
413 Err(async_broadcast::TrySendError::Closed(_))
414 | Err(async_broadcast::TrySendError::Inactive(_)) => {
415 log::warn!("no active receiver for frame");
416 false
417 }
418 Err(async_broadcast::TrySendError::Full(_)) => {
419 log::warn!("skipping frame as receiver is not keeping up");
420 true
421 }
422 }
423 }
424 }
425
426 extern "C" fn on_drop(callback_data: *mut c_void) {
427 unsafe {
428 let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
429 }
430 }
431
432 let (tx, rx) = async_broadcast::broadcast(64);
433 unsafe {
434 let renderer = LKVideoRendererCreate(
435 Box::into_raw(Box::new(tx)) as *mut c_void,
436 on_frame,
437 on_drop,
438 );
439 LKVideoTrackAddRenderer(self.native_track, renderer);
440 rx
441 }
442 }
443}
444
445impl Drop for RemoteVideoTrack {
446 fn drop(&mut self) {
447 unsafe { CFRelease(self.native_track) }
448 }
449}
450
451pub enum RemoteVideoTrackUpdate {
452 Subscribed(Arc<RemoteVideoTrack>),
453 Unsubscribed { publisher_id: Sid, track_id: Sid },
454}
455
456pub struct MacOSDisplay(*const c_void);
457
458impl MacOSDisplay {
459 fn new(ptr: *const c_void) -> Self {
460 unsafe {
461 CFRetain(ptr);
462 }
463 Self(ptr)
464 }
465}
466
467impl Drop for MacOSDisplay {
468 fn drop(&mut self) {
469 unsafe { CFRelease(self.0) }
470 }
471}
472
473#[derive(Clone)]
474pub struct Frame(CVImageBuffer);
475
476impl Frame {
477 pub fn width(&self) -> usize {
478 self.0.width()
479 }
480
481 pub fn height(&self) -> usize {
482 self.0.height()
483 }
484
485 pub fn image(&self) -> CVImageBuffer {
486 self.0.clone()
487 }
488}