1use crate::{ConnectionState, RoomUpdate, Sid};
2use anyhow::{anyhow, Context, Result};
3use async_trait::async_trait;
4use collections::{BTreeMap, HashMap};
5use futures::Stream;
6use gpui::BackgroundExecutor;
7use live_kit_server::{proto, token};
8use media::core_video::CVImageBuffer;
9use parking_lot::Mutex;
10use postage::watch;
11use std::{
12 future::Future,
13 mem,
14 sync::{
15 atomic::{AtomicBool, Ordering::SeqCst},
16 Arc,
17 },
18};
19
20static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
21
22pub struct TestServer {
23 pub url: String,
24 pub api_key: String,
25 pub secret_key: String,
26 rooms: Mutex<HashMap<String, TestServerRoom>>,
27 executor: BackgroundExecutor,
28}
29
30impl TestServer {
31 pub fn create(
32 url: String,
33 api_key: String,
34 secret_key: String,
35 executor: BackgroundExecutor,
36 ) -> Result<Arc<TestServer>> {
37 let mut servers = SERVERS.lock();
38 if servers.contains_key(&url) {
39 Err(anyhow!("a server with url {:?} already exists", url))
40 } else {
41 let server = Arc::new(TestServer {
42 url: url.clone(),
43 api_key,
44 secret_key,
45 rooms: Default::default(),
46 executor,
47 });
48 servers.insert(url, server.clone());
49 Ok(server)
50 }
51 }
52
53 fn get(url: &str) -> Result<Arc<TestServer>> {
54 Ok(SERVERS
55 .lock()
56 .get(url)
57 .ok_or_else(|| anyhow!("no server found for url"))?
58 .clone())
59 }
60
61 pub fn teardown(&self) -> Result<()> {
62 SERVERS
63 .lock()
64 .remove(&self.url)
65 .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
66 Ok(())
67 }
68
69 pub fn create_api_client(&self) -> TestApiClient {
70 TestApiClient {
71 url: self.url.clone(),
72 }
73 }
74
75 pub async fn create_room(&self, room: String) -> Result<()> {
76 self.executor.simulate_random_delay().await;
77 let mut server_rooms = self.rooms.lock();
78 if server_rooms.contains_key(&room) {
79 Err(anyhow!("room {:?} already exists", room))
80 } else {
81 server_rooms.insert(room, Default::default());
82 Ok(())
83 }
84 }
85
86 async fn delete_room(&self, room: String) -> Result<()> {
87 // TODO: clear state associated with all `Room`s.
88 self.executor.simulate_random_delay().await;
89 let mut server_rooms = self.rooms.lock();
90 server_rooms
91 .remove(&room)
92 .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
93 Ok(())
94 }
95
96 async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
97 self.executor.simulate_random_delay().await;
98 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
99 let identity = claims.sub.unwrap().to_string();
100 let room_name = claims.video.room.unwrap();
101 let mut server_rooms = self.rooms.lock();
102 let room = (*server_rooms).entry(room_name.to_string()).or_default();
103
104 if room.client_rooms.contains_key(&identity) {
105 Err(anyhow!(
106 "{:?} attempted to join room {:?} twice",
107 identity,
108 room_name
109 ))
110 } else {
111 for track in &room.video_tracks {
112 client_room
113 .0
114 .lock()
115 .updates_tx
116 .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone()))
117 .unwrap();
118 }
119 room.client_rooms.insert(identity, client_room);
120 Ok(())
121 }
122 }
123
124 async fn leave_room(&self, token: String) -> Result<()> {
125 self.executor.simulate_random_delay().await;
126 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
127 let identity = claims.sub.unwrap().to_string();
128 let room_name = claims.video.room.unwrap();
129 let mut server_rooms = self.rooms.lock();
130 let room = server_rooms
131 .get_mut(&*room_name)
132 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
133 room.client_rooms.remove(&identity).ok_or_else(|| {
134 anyhow!(
135 "{:?} attempted to leave room {:?} before joining it",
136 identity,
137 room_name
138 )
139 })?;
140 Ok(())
141 }
142
143 async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
144 // TODO: clear state associated with the `Room`.
145
146 self.executor.simulate_random_delay().await;
147 let mut server_rooms = self.rooms.lock();
148 let room = server_rooms
149 .get_mut(&room_name)
150 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
151 room.client_rooms.remove(&identity).ok_or_else(|| {
152 anyhow!(
153 "participant {:?} did not join room {:?}",
154 identity,
155 room_name
156 )
157 })?;
158 Ok(())
159 }
160
161 async fn update_participant(
162 &self,
163 room_name: String,
164 identity: String,
165 permission: proto::ParticipantPermission,
166 ) -> Result<()> {
167 self.executor.simulate_random_delay().await;
168 let mut server_rooms = self.rooms.lock();
169 let room = server_rooms
170 .get_mut(&room_name)
171 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
172 room.participant_permissions.insert(identity, permission);
173 Ok(())
174 }
175
176 pub async fn disconnect_client(&self, client_identity: String) {
177 self.executor.simulate_random_delay().await;
178 let mut server_rooms = self.rooms.lock();
179 for room in server_rooms.values_mut() {
180 if let Some(room) = room.client_rooms.remove(&client_identity) {
181 *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
182 }
183 }
184 }
185
186 async fn publish_video_track(
187 &self,
188 token: String,
189 local_track: LocalVideoTrack,
190 ) -> Result<Sid> {
191 self.executor.simulate_random_delay().await;
192 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
193 let identity = claims.sub.unwrap().to_string();
194 let room_name = claims.video.room.unwrap();
195
196 let mut server_rooms = self.rooms.lock();
197 let room = server_rooms
198 .get_mut(&*room_name)
199 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
200
201 let can_publish = room
202 .participant_permissions
203 .get(&identity)
204 .map(|permission| permission.can_publish)
205 .or(claims.video.can_publish)
206 .unwrap_or(true);
207
208 if !can_publish {
209 return Err(anyhow!("user is not allowed to publish"));
210 }
211
212 let sid = nanoid::nanoid!(17);
213 let track = Arc::new(RemoteVideoTrack {
214 sid: sid.clone(),
215 publisher_id: identity.clone(),
216 frames_rx: local_track.frames_rx.clone(),
217 });
218
219 room.video_tracks.push(track.clone());
220
221 for (id, client_room) in &room.client_rooms {
222 if *id != identity {
223 let _ = client_room
224 .0
225 .lock()
226 .updates_tx
227 .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone()))
228 .unwrap();
229 }
230 }
231
232 Ok(sid)
233 }
234
235 async fn publish_audio_track(
236 &self,
237 token: String,
238 _local_track: &LocalAudioTrack,
239 ) -> Result<Sid> {
240 self.executor.simulate_random_delay().await;
241 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
242 let identity = claims.sub.unwrap().to_string();
243 let room_name = claims.video.room.unwrap();
244
245 let mut server_rooms = self.rooms.lock();
246 let room = server_rooms
247 .get_mut(&*room_name)
248 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
249
250 let can_publish = room
251 .participant_permissions
252 .get(&identity)
253 .map(|permission| permission.can_publish)
254 .or(claims.video.can_publish)
255 .unwrap_or(true);
256
257 if !can_publish {
258 return Err(anyhow!("user is not allowed to publish"));
259 }
260
261 let sid = nanoid::nanoid!(17);
262 let track = Arc::new(RemoteAudioTrack {
263 sid: sid.clone(),
264 publisher_id: identity.clone(),
265 running: AtomicBool::new(true),
266 });
267
268 let publication = Arc::new(RemoteTrackPublication);
269
270 room.audio_tracks.push(track.clone());
271
272 for (id, client_room) in &room.client_rooms {
273 if *id != identity {
274 let _ = client_room
275 .0
276 .lock()
277 .updates_tx
278 .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
279 track.clone(),
280 publication.clone(),
281 ))
282 .unwrap();
283 }
284 }
285
286 Ok(sid)
287 }
288
289 fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
290 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
291 let room_name = claims.video.room.unwrap();
292
293 let mut server_rooms = self.rooms.lock();
294 let room = server_rooms
295 .get_mut(&*room_name)
296 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
297 Ok(room.video_tracks.clone())
298 }
299
300 fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
301 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
302 let room_name = claims.video.room.unwrap();
303
304 let mut server_rooms = self.rooms.lock();
305 let room = server_rooms
306 .get_mut(&*room_name)
307 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
308 Ok(room.audio_tracks.clone())
309 }
310}
311
312#[derive(Default)]
313struct TestServerRoom {
314 client_rooms: HashMap<Sid, Arc<Room>>,
315 video_tracks: Vec<Arc<RemoteVideoTrack>>,
316 audio_tracks: Vec<Arc<RemoteAudioTrack>>,
317 participant_permissions: HashMap<Sid, proto::ParticipantPermission>,
318}
319
320impl TestServerRoom {}
321
322pub struct TestApiClient {
323 url: String,
324}
325
326#[async_trait]
327impl live_kit_server::api::Client for TestApiClient {
328 fn url(&self) -> &str {
329 &self.url
330 }
331
332 async fn create_room(&self, name: String) -> Result<()> {
333 let server = TestServer::get(&self.url)?;
334 server.create_room(name).await?;
335 Ok(())
336 }
337
338 async fn delete_room(&self, name: String) -> Result<()> {
339 let server = TestServer::get(&self.url)?;
340 server.delete_room(name).await?;
341 Ok(())
342 }
343
344 async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
345 let server = TestServer::get(&self.url)?;
346 server.remove_participant(room, identity).await?;
347 Ok(())
348 }
349
350 async fn update_participant(
351 &self,
352 room: String,
353 identity: String,
354 permission: live_kit_server::proto::ParticipantPermission,
355 ) -> Result<()> {
356 let server = TestServer::get(&self.url)?;
357 server
358 .update_participant(room, identity, permission)
359 .await?;
360 Ok(())
361 }
362
363 fn room_token(&self, room: &str, identity: &str) -> Result<String> {
364 let server = TestServer::get(&self.url)?;
365 token::create(
366 &server.api_key,
367 &server.secret_key,
368 Some(identity),
369 token::VideoGrant::to_join(room),
370 )
371 }
372
373 fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
374 let server = TestServer::get(&self.url)?;
375 token::create(
376 &server.api_key,
377 &server.secret_key,
378 Some(identity),
379 token::VideoGrant::for_guest(room),
380 )
381 }
382}
383
384struct RoomState {
385 connection: (
386 watch::Sender<ConnectionState>,
387 watch::Receiver<ConnectionState>,
388 ),
389 display_sources: Vec<MacOSDisplay>,
390 updates_tx: async_broadcast::Sender<RoomUpdate>,
391 updates_rx: async_broadcast::Receiver<RoomUpdate>,
392}
393
394pub struct Room(Mutex<RoomState>);
395
396impl Room {
397 pub fn new() -> Arc<Self> {
398 let (updates_tx, updates_rx) = async_broadcast::broadcast(128);
399 Arc::new(Self(Mutex::new(RoomState {
400 connection: watch::channel_with(ConnectionState::Disconnected),
401 display_sources: Default::default(),
402 updates_tx,
403 updates_rx,
404 })))
405 }
406
407 pub fn status(&self) -> watch::Receiver<ConnectionState> {
408 self.0.lock().connection.1.clone()
409 }
410
411 pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
412 let this = self.clone();
413 let url = url.to_string();
414 let token = token.to_string();
415 async move {
416 let server = TestServer::get(&url)?;
417 server
418 .join_room(token.clone(), this.clone())
419 .await
420 .context("room join")?;
421 *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
422 Ok(())
423 }
424 }
425
426 pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
427 let this = self.clone();
428 async move {
429 let server = this.test_server();
430 server.executor.simulate_random_delay().await;
431 Ok(this.0.lock().display_sources.clone())
432 }
433 }
434
435 pub fn publish_video_track(
436 self: &Arc<Self>,
437 track: LocalVideoTrack,
438 ) -> impl Future<Output = Result<LocalTrackPublication>> {
439 let this = self.clone();
440 let track = track.clone();
441 async move {
442 let sid = this
443 .test_server()
444 .publish_video_track(this.token(), track)
445 .await?;
446 Ok(LocalTrackPublication {
447 muted: Default::default(),
448 sid,
449 })
450 }
451 }
452 pub fn publish_audio_track(
453 self: &Arc<Self>,
454 track: LocalAudioTrack,
455 ) -> impl Future<Output = Result<LocalTrackPublication>> {
456 let this = self.clone();
457 let track = track.clone();
458 async move {
459 let sid = this
460 .test_server()
461 .publish_audio_track(this.token(), &track)
462 .await?;
463 Ok(LocalTrackPublication {
464 muted: Default::default(),
465 sid,
466 })
467 }
468 }
469
470 pub fn unpublish_track(&self, _publication: LocalTrackPublication) {}
471
472 pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
473 if !self.is_connected() {
474 return Vec::new();
475 }
476
477 self.test_server()
478 .audio_tracks(self.token())
479 .unwrap()
480 .into_iter()
481 .filter(|track| track.publisher_id() == publisher_id)
482 .collect()
483 }
484
485 pub fn remote_audio_track_publications(
486 &self,
487 publisher_id: &str,
488 ) -> Vec<Arc<RemoteTrackPublication>> {
489 if !self.is_connected() {
490 return Vec::new();
491 }
492
493 self.test_server()
494 .audio_tracks(self.token())
495 .unwrap()
496 .into_iter()
497 .filter(|track| track.publisher_id() == publisher_id)
498 .map(|_track| Arc::new(RemoteTrackPublication {}))
499 .collect()
500 }
501
502 pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
503 if !self.is_connected() {
504 return Vec::new();
505 }
506
507 self.test_server()
508 .video_tracks(self.token())
509 .unwrap()
510 .into_iter()
511 .filter(|track| track.publisher_id() == publisher_id)
512 .collect()
513 }
514
515 pub fn updates(&self) -> impl Stream<Item = RoomUpdate> {
516 self.0.lock().updates_rx.clone()
517 }
518
519 pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
520 self.0.lock().display_sources = sources;
521 }
522
523 fn test_server(&self) -> Arc<TestServer> {
524 match self.0.lock().connection.1.borrow().clone() {
525 ConnectionState::Disconnected => panic!("must be connected to call this method"),
526 ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
527 }
528 }
529
530 fn token(&self) -> String {
531 match self.0.lock().connection.1.borrow().clone() {
532 ConnectionState::Disconnected => panic!("must be connected to call this method"),
533 ConnectionState::Connected { token, .. } => token,
534 }
535 }
536
537 fn is_connected(&self) -> bool {
538 match *self.0.lock().connection.1.borrow() {
539 ConnectionState::Disconnected => false,
540 ConnectionState::Connected { .. } => true,
541 }
542 }
543}
544
545impl Drop for Room {
546 fn drop(&mut self) {
547 if let ConnectionState::Connected { token, .. } = mem::replace(
548 &mut *self.0.lock().connection.0.borrow_mut(),
549 ConnectionState::Disconnected,
550 ) {
551 if let Ok(server) = TestServer::get(&token) {
552 let executor = server.executor.clone();
553 executor
554 .spawn(async move { server.leave_room(token).await.unwrap() })
555 .detach();
556 }
557 }
558 }
559}
560
561#[derive(Clone)]
562pub struct LocalTrackPublication {
563 sid: String,
564 muted: Arc<AtomicBool>,
565}
566
567impl LocalTrackPublication {
568 pub fn set_mute(&self, mute: bool) -> impl Future<Output = Result<()>> {
569 let muted = self.muted.clone();
570 async move {
571 muted.store(mute, SeqCst);
572 Ok(())
573 }
574 }
575
576 pub fn is_muted(&self) -> bool {
577 self.muted.load(SeqCst)
578 }
579
580 pub fn sid(&self) -> String {
581 self.sid.clone()
582 }
583}
584
585pub struct RemoteTrackPublication;
586
587impl RemoteTrackPublication {
588 pub fn set_enabled(&self, _enabled: bool) -> impl Future<Output = Result<()>> {
589 async { Ok(()) }
590 }
591
592 pub fn is_muted(&self) -> bool {
593 false
594 }
595
596 pub fn sid(&self) -> String {
597 "".to_string()
598 }
599}
600
601#[derive(Clone)]
602pub struct LocalVideoTrack {
603 frames_rx: async_broadcast::Receiver<Frame>,
604}
605
606impl LocalVideoTrack {
607 pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
608 Self {
609 frames_rx: display.frames.1.clone(),
610 }
611 }
612}
613
614#[derive(Clone)]
615pub struct LocalAudioTrack;
616
617impl LocalAudioTrack {
618 pub fn create() -> Self {
619 Self
620 }
621}
622
623#[derive(Debug)]
624pub struct RemoteVideoTrack {
625 sid: Sid,
626 publisher_id: Sid,
627 frames_rx: async_broadcast::Receiver<Frame>,
628}
629
630impl RemoteVideoTrack {
631 pub fn sid(&self) -> &str {
632 &self.sid
633 }
634
635 pub fn publisher_id(&self) -> &str {
636 &self.publisher_id
637 }
638
639 pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
640 self.frames_rx.clone()
641 }
642}
643
644#[derive(Debug)]
645pub struct RemoteAudioTrack {
646 sid: Sid,
647 publisher_id: Sid,
648 running: AtomicBool,
649}
650
651impl RemoteAudioTrack {
652 pub fn sid(&self) -> &str {
653 &self.sid
654 }
655
656 pub fn publisher_id(&self) -> &str {
657 &self.publisher_id
658 }
659
660 pub fn start(&self) {
661 self.running.store(true, SeqCst);
662 }
663
664 pub fn stop(&self) {
665 self.running.store(false, SeqCst);
666 }
667}
668
669#[derive(Clone)]
670pub struct MacOSDisplay {
671 frames: (
672 async_broadcast::Sender<Frame>,
673 async_broadcast::Receiver<Frame>,
674 ),
675}
676
677impl MacOSDisplay {
678 pub fn new() -> Self {
679 Self {
680 frames: async_broadcast::broadcast(128),
681 }
682 }
683
684 pub fn send_frame(&self, frame: Frame) {
685 self.frames.0.try_broadcast(frame).unwrap();
686 }
687}
688
689#[derive(Clone, Debug, PartialEq, Eq)]
690pub struct Frame {
691 pub label: String,
692 pub width: usize,
693 pub height: usize,
694}
695
696impl Frame {
697 pub fn width(&self) -> usize {
698 self.width
699 }
700
701 pub fn height(&self) -> usize {
702 self.height
703 }
704
705 pub fn image(&self) -> CVImageBuffer {
706 unimplemented!("you can't call this in test mode")
707 }
708}