1use anyhow::{anyhow, Context, Result};
2use async_trait::async_trait;
3use collections::{BTreeMap, HashMap};
4use futures::Stream;
5use gpui::BackgroundExecutor;
6use live_kit_server::{proto, token};
7use media::core_video::CVImageBuffer;
8use parking_lot::Mutex;
9use postage::watch;
10use std::{future::Future, mem, sync::Arc};
11
12static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
13
14pub struct TestServer {
15 pub url: String,
16 pub api_key: String,
17 pub secret_key: String,
18 rooms: Mutex<HashMap<String, TestServerRoom>>,
19 executor: BackgroundExecutor,
20}
21
22impl TestServer {
23 pub fn create(
24 url: String,
25 api_key: String,
26 secret_key: String,
27 executor: BackgroundExecutor,
28 ) -> Result<Arc<TestServer>> {
29 let mut servers = SERVERS.lock();
30 if servers.contains_key(&url) {
31 Err(anyhow!("a server with url {:?} already exists", url))
32 } else {
33 let server = Arc::new(TestServer {
34 url: url.clone(),
35 api_key,
36 secret_key,
37 rooms: Default::default(),
38 executor,
39 });
40 servers.insert(url, server.clone());
41 Ok(server)
42 }
43 }
44
45 fn get(url: &str) -> Result<Arc<TestServer>> {
46 Ok(SERVERS
47 .lock()
48 .get(url)
49 .ok_or_else(|| anyhow!("no server found for url"))?
50 .clone())
51 }
52
53 pub fn teardown(&self) -> Result<()> {
54 SERVERS
55 .lock()
56 .remove(&self.url)
57 .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
58 Ok(())
59 }
60
61 pub fn create_api_client(&self) -> TestApiClient {
62 TestApiClient {
63 url: self.url.clone(),
64 }
65 }
66
67 pub async fn create_room(&self, room: String) -> Result<()> {
68 self.executor.simulate_random_delay().await;
69 let mut server_rooms = self.rooms.lock();
70 if server_rooms.contains_key(&room) {
71 Err(anyhow!("room {:?} already exists", room))
72 } else {
73 server_rooms.insert(room, Default::default());
74 Ok(())
75 }
76 }
77
78 async fn delete_room(&self, room: String) -> Result<()> {
79 // TODO: clear state associated with all `Room`s.
80 self.executor.simulate_random_delay().await;
81 let mut server_rooms = self.rooms.lock();
82 server_rooms
83 .remove(&room)
84 .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
85 Ok(())
86 }
87
88 async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
89 self.executor.simulate_random_delay().await;
90 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
91 let identity = claims.sub.unwrap().to_string();
92 let room_name = claims.video.room.unwrap();
93 let mut server_rooms = self.rooms.lock();
94 let room = (*server_rooms).entry(room_name.to_string()).or_default();
95
96 if room.client_rooms.contains_key(&identity) {
97 Err(anyhow!(
98 "{:?} attempted to join room {:?} twice",
99 identity,
100 room_name
101 ))
102 } else {
103 for track in &room.video_tracks {
104 client_room
105 .0
106 .lock()
107 .video_track_updates
108 .0
109 .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
110 .unwrap();
111 }
112 room.client_rooms.insert(identity, client_room);
113 Ok(())
114 }
115 }
116
117 async fn leave_room(&self, token: String) -> Result<()> {
118 self.executor.simulate_random_delay().await;
119 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
120 let identity = claims.sub.unwrap().to_string();
121 let room_name = claims.video.room.unwrap();
122 let mut server_rooms = self.rooms.lock();
123 let room = server_rooms
124 .get_mut(&*room_name)
125 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
126 room.client_rooms.remove(&identity).ok_or_else(|| {
127 anyhow!(
128 "{:?} attempted to leave room {:?} before joining it",
129 identity,
130 room_name
131 )
132 })?;
133 Ok(())
134 }
135
136 async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
137 // TODO: clear state associated with the `Room`.
138
139 self.executor.simulate_random_delay().await;
140 let mut server_rooms = self.rooms.lock();
141 let room = server_rooms
142 .get_mut(&room_name)
143 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
144 room.client_rooms.remove(&identity).ok_or_else(|| {
145 anyhow!(
146 "participant {:?} did not join room {:?}",
147 identity,
148 room_name
149 )
150 })?;
151 Ok(())
152 }
153
154 async fn update_participant(
155 &self,
156 room_name: String,
157 identity: String,
158 permission: proto::ParticipantPermission,
159 ) -> Result<()> {
160 self.executor.simulate_random_delay().await;
161 let mut server_rooms = self.rooms.lock();
162 let room = server_rooms
163 .get_mut(&room_name)
164 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
165 room.participant_permissions.insert(identity, permission);
166 Ok(())
167 }
168
169 pub async fn disconnect_client(&self, client_identity: String) {
170 self.executor.simulate_random_delay().await;
171 let mut server_rooms = self.rooms.lock();
172 for room in server_rooms.values_mut() {
173 if let Some(room) = room.client_rooms.remove(&client_identity) {
174 *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
175 }
176 }
177 }
178
179 async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> {
180 self.executor.simulate_random_delay().await;
181 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
182 let identity = claims.sub.unwrap().to_string();
183 let room_name = claims.video.room.unwrap();
184
185 let mut server_rooms = self.rooms.lock();
186 let room = server_rooms
187 .get_mut(&*room_name)
188 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
189
190 let can_publish = room
191 .participant_permissions
192 .get(&identity)
193 .map(|permission| permission.can_publish)
194 .or(claims.video.can_publish)
195 .unwrap_or(true);
196
197 if !can_publish {
198 return Err(anyhow!("user is not allowed to publish"));
199 }
200
201 let track = Arc::new(RemoteVideoTrack {
202 sid: nanoid::nanoid!(17),
203 publisher_id: identity.clone(),
204 frames_rx: local_track.frames_rx.clone(),
205 });
206
207 room.video_tracks.push(track.clone());
208
209 for (id, client_room) in &room.client_rooms {
210 if *id != identity {
211 let _ = client_room
212 .0
213 .lock()
214 .video_track_updates
215 .0
216 .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
217 .unwrap();
218 }
219 }
220
221 Ok(())
222 }
223
224 async fn publish_audio_track(
225 &self,
226 token: String,
227 _local_track: &LocalAudioTrack,
228 ) -> Result<()> {
229 self.executor.simulate_random_delay().await;
230 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
231 let identity = claims.sub.unwrap().to_string();
232 let room_name = claims.video.room.unwrap();
233
234 let mut server_rooms = self.rooms.lock();
235 let room = server_rooms
236 .get_mut(&*room_name)
237 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
238
239 let can_publish = room
240 .participant_permissions
241 .get(&identity)
242 .map(|permission| permission.can_publish)
243 .or(claims.video.can_publish)
244 .unwrap_or(true);
245
246 if !can_publish {
247 return Err(anyhow!("user is not allowed to publish"));
248 }
249
250 let track = Arc::new(RemoteAudioTrack {
251 sid: nanoid::nanoid!(17),
252 publisher_id: identity.clone(),
253 });
254
255 let publication = Arc::new(RemoteTrackPublication);
256
257 room.audio_tracks.push(track.clone());
258
259 for (id, client_room) in &room.client_rooms {
260 if *id != identity {
261 let _ = client_room
262 .0
263 .lock()
264 .audio_track_updates
265 .0
266 .try_broadcast(RemoteAudioTrackUpdate::Subscribed(
267 track.clone(),
268 publication.clone(),
269 ))
270 .unwrap();
271 }
272 }
273
274 Ok(())
275 }
276
277 fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
278 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
279 let room_name = claims.video.room.unwrap();
280
281 let mut server_rooms = self.rooms.lock();
282 let room = server_rooms
283 .get_mut(&*room_name)
284 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
285 Ok(room.video_tracks.clone())
286 }
287
288 fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
289 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
290 let room_name = claims.video.room.unwrap();
291
292 let mut server_rooms = self.rooms.lock();
293 let room = server_rooms
294 .get_mut(&*room_name)
295 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
296 Ok(room.audio_tracks.clone())
297 }
298}
299
300#[derive(Default)]
301struct TestServerRoom {
302 client_rooms: HashMap<Sid, Arc<Room>>,
303 video_tracks: Vec<Arc<RemoteVideoTrack>>,
304 audio_tracks: Vec<Arc<RemoteAudioTrack>>,
305 participant_permissions: HashMap<Sid, proto::ParticipantPermission>,
306}
307
308impl TestServerRoom {}
309
310pub struct TestApiClient {
311 url: String,
312}
313
314#[async_trait]
315impl live_kit_server::api::Client for TestApiClient {
316 fn url(&self) -> &str {
317 &self.url
318 }
319
320 async fn create_room(&self, name: String) -> Result<()> {
321 let server = TestServer::get(&self.url)?;
322 server.create_room(name).await?;
323 Ok(())
324 }
325
326 async fn delete_room(&self, name: String) -> Result<()> {
327 let server = TestServer::get(&self.url)?;
328 server.delete_room(name).await?;
329 Ok(())
330 }
331
332 async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
333 let server = TestServer::get(&self.url)?;
334 server.remove_participant(room, identity).await?;
335 Ok(())
336 }
337
338 async fn update_participant(
339 &self,
340 room: String,
341 identity: String,
342 permission: live_kit_server::proto::ParticipantPermission,
343 ) -> Result<()> {
344 let server = TestServer::get(&self.url)?;
345 server
346 .update_participant(room, identity, permission)
347 .await?;
348 Ok(())
349 }
350
351 fn room_token(&self, room: &str, identity: &str) -> Result<String> {
352 let server = TestServer::get(&self.url)?;
353 token::create(
354 &server.api_key,
355 &server.secret_key,
356 Some(identity),
357 token::VideoGrant::to_join(room),
358 )
359 }
360
361 fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
362 let server = TestServer::get(&self.url)?;
363 token::create(
364 &server.api_key,
365 &server.secret_key,
366 Some(identity),
367 token::VideoGrant::for_guest(room),
368 )
369 }
370}
371
372pub type Sid = String;
373
374struct RoomState {
375 connection: (
376 watch::Sender<ConnectionState>,
377 watch::Receiver<ConnectionState>,
378 ),
379 display_sources: Vec<MacOSDisplay>,
380 audio_track_updates: (
381 async_broadcast::Sender<RemoteAudioTrackUpdate>,
382 async_broadcast::Receiver<RemoteAudioTrackUpdate>,
383 ),
384 video_track_updates: (
385 async_broadcast::Sender<RemoteVideoTrackUpdate>,
386 async_broadcast::Receiver<RemoteVideoTrackUpdate>,
387 ),
388}
389
390#[derive(Clone, Eq, PartialEq)]
391pub enum ConnectionState {
392 Disconnected,
393 Connected { url: String, token: String },
394}
395
396pub struct Room(Mutex<RoomState>);
397
398impl Room {
399 pub fn new() -> Arc<Self> {
400 Arc::new(Self(Mutex::new(RoomState {
401 connection: watch::channel_with(ConnectionState::Disconnected),
402 display_sources: Default::default(),
403 video_track_updates: async_broadcast::broadcast(128),
404 audio_track_updates: async_broadcast::broadcast(128),
405 })))
406 }
407
408 pub fn status(&self) -> watch::Receiver<ConnectionState> {
409 self.0.lock().connection.1.clone()
410 }
411
412 pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
413 let this = self.clone();
414 let url = url.to_string();
415 let token = token.to_string();
416 async move {
417 let server = TestServer::get(&url)?;
418 server
419 .join_room(token.clone(), this.clone())
420 .await
421 .context("room join")?;
422 *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
423 Ok(())
424 }
425 }
426
427 pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
428 let this = self.clone();
429 async move {
430 let server = this.test_server();
431 server.executor.simulate_random_delay().await;
432 Ok(this.0.lock().display_sources.clone())
433 }
434 }
435
436 pub fn publish_video_track(
437 self: &Arc<Self>,
438 track: LocalVideoTrack,
439 ) -> impl Future<Output = Result<LocalTrackPublication>> {
440 let this = self.clone();
441 let track = track.clone();
442 async move {
443 this.test_server()
444 .publish_video_track(this.token(), track)
445 .await?;
446 Ok(LocalTrackPublication)
447 }
448 }
449 pub fn publish_audio_track(
450 self: &Arc<Self>,
451 track: LocalAudioTrack,
452 ) -> impl Future<Output = Result<LocalTrackPublication>> {
453 let this = self.clone();
454 let track = track.clone();
455 async move {
456 this.test_server()
457 .publish_audio_track(this.token(), &track)
458 .await?;
459 Ok(LocalTrackPublication)
460 }
461 }
462
463 pub fn unpublish_track(&self, _publication: LocalTrackPublication) {}
464
465 pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
466 if !self.is_connected() {
467 return Vec::new();
468 }
469
470 self.test_server()
471 .audio_tracks(self.token())
472 .unwrap()
473 .into_iter()
474 .filter(|track| track.publisher_id() == publisher_id)
475 .collect()
476 }
477
478 pub fn remote_audio_track_publications(
479 &self,
480 publisher_id: &str,
481 ) -> Vec<Arc<RemoteTrackPublication>> {
482 if !self.is_connected() {
483 return Vec::new();
484 }
485
486 self.test_server()
487 .audio_tracks(self.token())
488 .unwrap()
489 .into_iter()
490 .filter(|track| track.publisher_id() == publisher_id)
491 .map(|_track| Arc::new(RemoteTrackPublication {}))
492 .collect()
493 }
494
495 pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
496 if !self.is_connected() {
497 return Vec::new();
498 }
499
500 self.test_server()
501 .video_tracks(self.token())
502 .unwrap()
503 .into_iter()
504 .filter(|track| track.publisher_id() == publisher_id)
505 .collect()
506 }
507
508 pub fn remote_audio_track_updates(&self) -> impl Stream<Item = RemoteAudioTrackUpdate> {
509 self.0.lock().audio_track_updates.1.clone()
510 }
511
512 pub fn remote_video_track_updates(&self) -> impl Stream<Item = RemoteVideoTrackUpdate> {
513 self.0.lock().video_track_updates.1.clone()
514 }
515
516 pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
517 self.0.lock().display_sources = sources;
518 }
519
520 fn test_server(&self) -> Arc<TestServer> {
521 match self.0.lock().connection.1.borrow().clone() {
522 ConnectionState::Disconnected => panic!("must be connected to call this method"),
523 ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
524 }
525 }
526
527 fn token(&self) -> String {
528 match self.0.lock().connection.1.borrow().clone() {
529 ConnectionState::Disconnected => panic!("must be connected to call this method"),
530 ConnectionState::Connected { token, .. } => token,
531 }
532 }
533
534 fn is_connected(&self) -> bool {
535 match *self.0.lock().connection.1.borrow() {
536 ConnectionState::Disconnected => false,
537 ConnectionState::Connected { .. } => true,
538 }
539 }
540}
541
542impl Drop for Room {
543 fn drop(&mut self) {
544 if let ConnectionState::Connected { token, .. } = mem::replace(
545 &mut *self.0.lock().connection.0.borrow_mut(),
546 ConnectionState::Disconnected,
547 ) {
548 if let Ok(server) = TestServer::get(&token) {
549 let executor = server.executor.clone();
550 executor
551 .spawn(async move { server.leave_room(token).await.unwrap() })
552 .detach();
553 }
554 }
555 }
556}
557
558pub struct LocalTrackPublication;
559
560impl LocalTrackPublication {
561 pub fn set_mute(&self, _mute: bool) -> impl Future<Output = Result<()>> {
562 async { Ok(()) }
563 }
564}
565
566pub struct RemoteTrackPublication;
567
568impl RemoteTrackPublication {
569 pub fn set_enabled(&self, _enabled: bool) -> impl Future<Output = Result<()>> {
570 async { Ok(()) }
571 }
572
573 pub fn is_muted(&self) -> bool {
574 false
575 }
576
577 pub fn sid(&self) -> String {
578 "".to_string()
579 }
580}
581
582#[derive(Clone)]
583pub struct LocalVideoTrack {
584 frames_rx: async_broadcast::Receiver<Frame>,
585}
586
587impl LocalVideoTrack {
588 pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
589 Self {
590 frames_rx: display.frames.1.clone(),
591 }
592 }
593}
594
595#[derive(Clone)]
596pub struct LocalAudioTrack;
597
598impl LocalAudioTrack {
599 pub fn create() -> Self {
600 Self
601 }
602}
603
604#[derive(Debug)]
605pub struct RemoteVideoTrack {
606 sid: Sid,
607 publisher_id: Sid,
608 frames_rx: async_broadcast::Receiver<Frame>,
609}
610
611impl RemoteVideoTrack {
612 pub fn sid(&self) -> &str {
613 &self.sid
614 }
615
616 pub fn publisher_id(&self) -> &str {
617 &self.publisher_id
618 }
619
620 pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
621 self.frames_rx.clone()
622 }
623}
624
625#[derive(Debug)]
626pub struct RemoteAudioTrack {
627 sid: Sid,
628 publisher_id: Sid,
629}
630
631impl RemoteAudioTrack {
632 pub fn sid(&self) -> &str {
633 &self.sid
634 }
635
636 pub fn publisher_id(&self) -> &str {
637 &self.publisher_id
638 }
639
640 pub fn enable(&self) -> impl Future<Output = Result<()>> {
641 async { Ok(()) }
642 }
643
644 pub fn disable(&self) -> impl Future<Output = Result<()>> {
645 async { Ok(()) }
646 }
647}
648
649#[derive(Clone)]
650pub enum RemoteVideoTrackUpdate {
651 Subscribed(Arc<RemoteVideoTrack>),
652 Unsubscribed { publisher_id: Sid, track_id: Sid },
653}
654
655#[derive(Clone)]
656pub enum RemoteAudioTrackUpdate {
657 ActiveSpeakersChanged { speakers: Vec<Sid> },
658 MuteChanged { track_id: Sid, muted: bool },
659 Subscribed(Arc<RemoteAudioTrack>, Arc<RemoteTrackPublication>),
660 Unsubscribed { publisher_id: Sid, track_id: Sid },
661}
662
663#[derive(Clone)]
664pub struct MacOSDisplay {
665 frames: (
666 async_broadcast::Sender<Frame>,
667 async_broadcast::Receiver<Frame>,
668 ),
669}
670
671impl MacOSDisplay {
672 pub fn new() -> Self {
673 Self {
674 frames: async_broadcast::broadcast(128),
675 }
676 }
677
678 pub fn send_frame(&self, frame: Frame) {
679 self.frames.0.try_broadcast(frame).unwrap();
680 }
681}
682
683#[derive(Clone, Debug, PartialEq, Eq)]
684pub struct Frame {
685 pub label: String,
686 pub width: usize,
687 pub height: usize,
688}
689
690impl Frame {
691 pub fn width(&self) -> usize {
692 self.width
693 }
694
695 pub fn height(&self) -> usize {
696 self.height
697 }
698
699 pub fn image(&self) -> CVImageBuffer {
700 unimplemented!("you can't call this in test mode")
701 }
702}