1use crate::{ConnectionState, RoomUpdate, Sid};
2use anyhow::{anyhow, Context, Result};
3use async_trait::async_trait;
4use collections::{BTreeMap, HashMap};
5use futures::Stream;
6use gpui::BackgroundExecutor;
7use live_kit_server::{proto, token};
8use media::core_video::CVImageBuffer;
9use parking_lot::Mutex;
10use postage::watch;
11use std::{
12 future::Future,
13 mem,
14 sync::{
15 atomic::{AtomicBool, Ordering::SeqCst},
16 Arc,
17 },
18};
19
20static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
21
22pub struct TestServer {
23 pub url: String,
24 pub api_key: String,
25 pub secret_key: String,
26 rooms: Mutex<HashMap<String, TestServerRoom>>,
27 executor: BackgroundExecutor,
28}
29
30impl TestServer {
31 pub fn create(
32 url: String,
33 api_key: String,
34 secret_key: String,
35 executor: BackgroundExecutor,
36 ) -> Result<Arc<TestServer>> {
37 let mut servers = SERVERS.lock();
38 if servers.contains_key(&url) {
39 Err(anyhow!("a server with url {:?} already exists", url))
40 } else {
41 let server = Arc::new(TestServer {
42 url: url.clone(),
43 api_key,
44 secret_key,
45 rooms: Default::default(),
46 executor,
47 });
48 servers.insert(url, server.clone());
49 Ok(server)
50 }
51 }
52
53 fn get(url: &str) -> Result<Arc<TestServer>> {
54 Ok(SERVERS
55 .lock()
56 .get(url)
57 .ok_or_else(|| anyhow!("no server found for url"))?
58 .clone())
59 }
60
61 pub fn teardown(&self) -> Result<()> {
62 SERVERS
63 .lock()
64 .remove(&self.url)
65 .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
66 Ok(())
67 }
68
69 pub fn create_api_client(&self) -> TestApiClient {
70 TestApiClient {
71 url: self.url.clone(),
72 }
73 }
74
75 pub async fn create_room(&self, room: String) -> Result<()> {
76 self.executor.simulate_random_delay().await;
77 let mut server_rooms = self.rooms.lock();
78 if server_rooms.contains_key(&room) {
79 Err(anyhow!("room {:?} already exists", room))
80 } else {
81 server_rooms.insert(room, Default::default());
82 Ok(())
83 }
84 }
85
86 async fn delete_room(&self, room: String) -> Result<()> {
87 // TODO: clear state associated with all `Room`s.
88 self.executor.simulate_random_delay().await;
89 let mut server_rooms = self.rooms.lock();
90 server_rooms
91 .remove(&room)
92 .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
93 Ok(())
94 }
95
96 async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
97 self.executor.simulate_random_delay().await;
98 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
99 let identity = claims.sub.unwrap().to_string();
100 let room_name = claims.video.room.unwrap();
101 let mut server_rooms = self.rooms.lock();
102 let room = (*server_rooms).entry(room_name.to_string()).or_default();
103
104 if room.client_rooms.contains_key(&identity) {
105 Err(anyhow!(
106 "{:?} attempted to join room {:?} twice",
107 identity,
108 room_name
109 ))
110 } else {
111 for track in &room.video_tracks {
112 client_room
113 .0
114 .lock()
115 .updates_tx
116 .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone()))
117 .unwrap();
118 }
119 room.client_rooms.insert(identity, client_room);
120 Ok(())
121 }
122 }
123
124 async fn leave_room(&self, token: String) -> Result<()> {
125 self.executor.simulate_random_delay().await;
126 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
127 let identity = claims.sub.unwrap().to_string();
128 let room_name = claims.video.room.unwrap();
129 let mut server_rooms = self.rooms.lock();
130 let room = server_rooms
131 .get_mut(&*room_name)
132 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
133 room.client_rooms.remove(&identity).ok_or_else(|| {
134 anyhow!(
135 "{:?} attempted to leave room {:?} before joining it",
136 identity,
137 room_name
138 )
139 })?;
140 Ok(())
141 }
142
143 async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
144 // TODO: clear state associated with the `Room`.
145
146 self.executor.simulate_random_delay().await;
147 let mut server_rooms = self.rooms.lock();
148 let room = server_rooms
149 .get_mut(&room_name)
150 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
151 room.client_rooms.remove(&identity).ok_or_else(|| {
152 anyhow!(
153 "participant {:?} did not join room {:?}",
154 identity,
155 room_name
156 )
157 })?;
158 Ok(())
159 }
160
161 async fn update_participant(
162 &self,
163 room_name: String,
164 identity: String,
165 permission: proto::ParticipantPermission,
166 ) -> Result<()> {
167 self.executor.simulate_random_delay().await;
168 let mut server_rooms = self.rooms.lock();
169 let room = server_rooms
170 .get_mut(&room_name)
171 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
172 room.participant_permissions.insert(identity, permission);
173 Ok(())
174 }
175
176 pub async fn disconnect_client(&self, client_identity: String) {
177 self.executor.simulate_random_delay().await;
178 let mut server_rooms = self.rooms.lock();
179 for room in server_rooms.values_mut() {
180 if let Some(room) = room.client_rooms.remove(&client_identity) {
181 *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
182 }
183 }
184 }
185
186 async fn publish_video_track(
187 &self,
188 token: String,
189 local_track: LocalVideoTrack,
190 ) -> Result<Sid> {
191 self.executor.simulate_random_delay().await;
192 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
193 let identity = claims.sub.unwrap().to_string();
194 let room_name = claims.video.room.unwrap();
195
196 let mut server_rooms = self.rooms.lock();
197 let room = server_rooms
198 .get_mut(&*room_name)
199 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
200
201 let can_publish = room
202 .participant_permissions
203 .get(&identity)
204 .map(|permission| permission.can_publish)
205 .or(claims.video.can_publish)
206 .unwrap_or(true);
207
208 if !can_publish {
209 return Err(anyhow!("user is not allowed to publish"));
210 }
211
212 let sid = nanoid::nanoid!(17);
213 let track = Arc::new(RemoteVideoTrack {
214 sid: sid.clone(),
215 publisher_id: identity.clone(),
216 frames_rx: local_track.frames_rx.clone(),
217 });
218
219 room.video_tracks.push(track.clone());
220
221 for (id, client_room) in &room.client_rooms {
222 if *id != identity {
223 let _ = client_room
224 .0
225 .lock()
226 .updates_tx
227 .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone()))
228 .unwrap();
229 }
230 }
231
232 Ok(sid)
233 }
234
235 async fn publish_audio_track(
236 &self,
237 token: String,
238 _local_track: &LocalAudioTrack,
239 ) -> Result<Sid> {
240 self.executor.simulate_random_delay().await;
241 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
242 let identity = claims.sub.unwrap().to_string();
243 let room_name = claims.video.room.unwrap();
244
245 let mut server_rooms = self.rooms.lock();
246 let room = server_rooms
247 .get_mut(&*room_name)
248 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
249
250 let can_publish = room
251 .participant_permissions
252 .get(&identity)
253 .map(|permission| permission.can_publish)
254 .or(claims.video.can_publish)
255 .unwrap_or(true);
256
257 if !can_publish {
258 return Err(anyhow!("user is not allowed to publish"));
259 }
260
261 let sid = nanoid::nanoid!(17);
262 let track = Arc::new(RemoteAudioTrack {
263 sid: sid.clone(),
264 publisher_id: identity.clone(),
265 });
266
267 let publication = Arc::new(RemoteTrackPublication);
268
269 room.audio_tracks.push(track.clone());
270
271 for (id, client_room) in &room.client_rooms {
272 if *id != identity {
273 let _ = client_room
274 .0
275 .lock()
276 .updates_tx
277 .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
278 track.clone(),
279 publication.clone(),
280 ))
281 .unwrap();
282 }
283 }
284
285 Ok(sid)
286 }
287
288 fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
289 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
290 let room_name = claims.video.room.unwrap();
291
292 let mut server_rooms = self.rooms.lock();
293 let room = server_rooms
294 .get_mut(&*room_name)
295 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
296 Ok(room.video_tracks.clone())
297 }
298
299 fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
300 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
301 let room_name = claims.video.room.unwrap();
302
303 let mut server_rooms = self.rooms.lock();
304 let room = server_rooms
305 .get_mut(&*room_name)
306 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
307 Ok(room.audio_tracks.clone())
308 }
309}
310
311#[derive(Default)]
312struct TestServerRoom {
313 client_rooms: HashMap<Sid, Arc<Room>>,
314 video_tracks: Vec<Arc<RemoteVideoTrack>>,
315 audio_tracks: Vec<Arc<RemoteAudioTrack>>,
316 participant_permissions: HashMap<Sid, proto::ParticipantPermission>,
317}
318
319impl TestServerRoom {}
320
321pub struct TestApiClient {
322 url: String,
323}
324
325#[async_trait]
326impl live_kit_server::api::Client for TestApiClient {
327 fn url(&self) -> &str {
328 &self.url
329 }
330
331 async fn create_room(&self, name: String) -> Result<()> {
332 let server = TestServer::get(&self.url)?;
333 server.create_room(name).await?;
334 Ok(())
335 }
336
337 async fn delete_room(&self, name: String) -> Result<()> {
338 let server = TestServer::get(&self.url)?;
339 server.delete_room(name).await?;
340 Ok(())
341 }
342
343 async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
344 let server = TestServer::get(&self.url)?;
345 server.remove_participant(room, identity).await?;
346 Ok(())
347 }
348
349 async fn update_participant(
350 &self,
351 room: String,
352 identity: String,
353 permission: live_kit_server::proto::ParticipantPermission,
354 ) -> Result<()> {
355 let server = TestServer::get(&self.url)?;
356 server
357 .update_participant(room, identity, permission)
358 .await?;
359 Ok(())
360 }
361
362 fn room_token(&self, room: &str, identity: &str) -> Result<String> {
363 let server = TestServer::get(&self.url)?;
364 token::create(
365 &server.api_key,
366 &server.secret_key,
367 Some(identity),
368 token::VideoGrant::to_join(room),
369 )
370 }
371
372 fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
373 let server = TestServer::get(&self.url)?;
374 token::create(
375 &server.api_key,
376 &server.secret_key,
377 Some(identity),
378 token::VideoGrant::for_guest(room),
379 )
380 }
381}
382
383struct RoomState {
384 connection: (
385 watch::Sender<ConnectionState>,
386 watch::Receiver<ConnectionState>,
387 ),
388 display_sources: Vec<MacOSDisplay>,
389 updates_tx: async_broadcast::Sender<RoomUpdate>,
390 updates_rx: async_broadcast::Receiver<RoomUpdate>,
391}
392
393pub struct Room(Mutex<RoomState>);
394
395impl Room {
396 pub fn new() -> Arc<Self> {
397 let (updates_tx, updates_rx) = async_broadcast::broadcast(128);
398 Arc::new(Self(Mutex::new(RoomState {
399 connection: watch::channel_with(ConnectionState::Disconnected),
400 display_sources: Default::default(),
401 updates_tx,
402 updates_rx,
403 })))
404 }
405
406 pub fn status(&self) -> watch::Receiver<ConnectionState> {
407 self.0.lock().connection.1.clone()
408 }
409
410 pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
411 let this = self.clone();
412 let url = url.to_string();
413 let token = token.to_string();
414 async move {
415 let server = TestServer::get(&url)?;
416 server
417 .join_room(token.clone(), this.clone())
418 .await
419 .context("room join")?;
420 *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
421 Ok(())
422 }
423 }
424
425 pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
426 let this = self.clone();
427 async move {
428 let server = this.test_server();
429 server.executor.simulate_random_delay().await;
430 Ok(this.0.lock().display_sources.clone())
431 }
432 }
433
434 pub fn publish_video_track(
435 self: &Arc<Self>,
436 track: LocalVideoTrack,
437 ) -> impl Future<Output = Result<LocalTrackPublication>> {
438 let this = self.clone();
439 let track = track.clone();
440 async move {
441 let sid = this
442 .test_server()
443 .publish_video_track(this.token(), track)
444 .await?;
445 Ok(LocalTrackPublication {
446 muted: Default::default(),
447 sid,
448 })
449 }
450 }
451 pub fn publish_audio_track(
452 self: &Arc<Self>,
453 track: LocalAudioTrack,
454 ) -> impl Future<Output = Result<LocalTrackPublication>> {
455 let this = self.clone();
456 let track = track.clone();
457 async move {
458 let sid = this
459 .test_server()
460 .publish_audio_track(this.token(), &track)
461 .await?;
462 Ok(LocalTrackPublication {
463 muted: Default::default(),
464 sid,
465 })
466 }
467 }
468
469 pub fn unpublish_track(&self, _publication: LocalTrackPublication) {}
470
471 pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
472 if !self.is_connected() {
473 return Vec::new();
474 }
475
476 self.test_server()
477 .audio_tracks(self.token())
478 .unwrap()
479 .into_iter()
480 .filter(|track| track.publisher_id() == publisher_id)
481 .collect()
482 }
483
484 pub fn remote_audio_track_publications(
485 &self,
486 publisher_id: &str,
487 ) -> Vec<Arc<RemoteTrackPublication>> {
488 if !self.is_connected() {
489 return Vec::new();
490 }
491
492 self.test_server()
493 .audio_tracks(self.token())
494 .unwrap()
495 .into_iter()
496 .filter(|track| track.publisher_id() == publisher_id)
497 .map(|_track| Arc::new(RemoteTrackPublication {}))
498 .collect()
499 }
500
501 pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
502 if !self.is_connected() {
503 return Vec::new();
504 }
505
506 self.test_server()
507 .video_tracks(self.token())
508 .unwrap()
509 .into_iter()
510 .filter(|track| track.publisher_id() == publisher_id)
511 .collect()
512 }
513
514 pub fn updates(&self) -> impl Stream<Item = RoomUpdate> {
515 self.0.lock().updates_rx.clone()
516 }
517
518 pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
519 self.0.lock().display_sources = sources;
520 }
521
522 fn test_server(&self) -> Arc<TestServer> {
523 match self.0.lock().connection.1.borrow().clone() {
524 ConnectionState::Disconnected => panic!("must be connected to call this method"),
525 ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
526 }
527 }
528
529 fn token(&self) -> String {
530 match self.0.lock().connection.1.borrow().clone() {
531 ConnectionState::Disconnected => panic!("must be connected to call this method"),
532 ConnectionState::Connected { token, .. } => token,
533 }
534 }
535
536 fn is_connected(&self) -> bool {
537 match *self.0.lock().connection.1.borrow() {
538 ConnectionState::Disconnected => false,
539 ConnectionState::Connected { .. } => true,
540 }
541 }
542}
543
544impl Drop for Room {
545 fn drop(&mut self) {
546 if let ConnectionState::Connected { token, .. } = mem::replace(
547 &mut *self.0.lock().connection.0.borrow_mut(),
548 ConnectionState::Disconnected,
549 ) {
550 if let Ok(server) = TestServer::get(&token) {
551 let executor = server.executor.clone();
552 executor
553 .spawn(async move { server.leave_room(token).await.unwrap() })
554 .detach();
555 }
556 }
557 }
558}
559
560#[derive(Clone)]
561pub struct LocalTrackPublication {
562 sid: String,
563 muted: Arc<AtomicBool>,
564}
565
566impl LocalTrackPublication {
567 pub fn set_mute(&self, mute: bool) -> impl Future<Output = Result<()>> {
568 let muted = self.muted.clone();
569 async move {
570 muted.store(mute, SeqCst);
571 Ok(())
572 }
573 }
574
575 pub fn is_muted(&self) -> bool {
576 self.muted.load(SeqCst)
577 }
578
579 pub fn sid(&self) -> String {
580 self.sid.clone()
581 }
582}
583
584pub struct RemoteTrackPublication;
585
586impl RemoteTrackPublication {
587 pub fn set_enabled(&self, _enabled: bool) -> impl Future<Output = Result<()>> {
588 async { Ok(()) }
589 }
590
591 pub fn is_muted(&self) -> bool {
592 false
593 }
594
595 pub fn sid(&self) -> String {
596 "".to_string()
597 }
598}
599
600#[derive(Clone)]
601pub struct LocalVideoTrack {
602 frames_rx: async_broadcast::Receiver<Frame>,
603}
604
605impl LocalVideoTrack {
606 pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
607 Self {
608 frames_rx: display.frames.1.clone(),
609 }
610 }
611}
612
613#[derive(Clone)]
614pub struct LocalAudioTrack;
615
616impl LocalAudioTrack {
617 pub fn create() -> Self {
618 Self
619 }
620}
621
622#[derive(Debug)]
623pub struct RemoteVideoTrack {
624 sid: Sid,
625 publisher_id: Sid,
626 frames_rx: async_broadcast::Receiver<Frame>,
627}
628
629impl RemoteVideoTrack {
630 pub fn sid(&self) -> &str {
631 &self.sid
632 }
633
634 pub fn publisher_id(&self) -> &str {
635 &self.publisher_id
636 }
637
638 pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
639 self.frames_rx.clone()
640 }
641}
642
643#[derive(Debug)]
644pub struct RemoteAudioTrack {
645 sid: Sid,
646 publisher_id: Sid,
647}
648
649impl RemoteAudioTrack {
650 pub fn sid(&self) -> &str {
651 &self.sid
652 }
653
654 pub fn publisher_id(&self) -> &str {
655 &self.publisher_id
656 }
657
658 pub fn enable(&self) -> impl Future<Output = Result<()>> {
659 async { Ok(()) }
660 }
661
662 pub fn disable(&self) -> impl Future<Output = Result<()>> {
663 async { Ok(()) }
664 }
665}
666
667#[derive(Clone)]
668pub struct MacOSDisplay {
669 frames: (
670 async_broadcast::Sender<Frame>,
671 async_broadcast::Receiver<Frame>,
672 ),
673}
674
675impl MacOSDisplay {
676 pub fn new() -> Self {
677 Self {
678 frames: async_broadcast::broadcast(128),
679 }
680 }
681
682 pub fn send_frame(&self, frame: Frame) {
683 self.frames.0.try_broadcast(frame).unwrap();
684 }
685}
686
687#[derive(Clone, Debug, PartialEq, Eq)]
688pub struct Frame {
689 pub label: String,
690 pub width: usize,
691 pub height: usize,
692}
693
694impl Frame {
695 pub fn width(&self) -> usize {
696 self.width
697 }
698
699 pub fn height(&self) -> usize {
700 self.height
701 }
702
703 pub fn image(&self) -> CVImageBuffer {
704 unimplemented!("you can't call this in test mode")
705 }
706}