1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use collections::{BTreeMap, HashMap};
4use futures::Stream;
5use gpui::executor::Background;
6use live_kit_server::token;
7use media::core_video::CVImageBuffer;
8use parking_lot::Mutex;
9use postage::watch;
10use std::{future::Future, mem, sync::Arc};
11
12static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
13
14pub struct TestServer {
15 pub url: String,
16 pub api_key: String,
17 pub secret_key: String,
18 rooms: Mutex<HashMap<String, TestServerRoom>>,
19 background: Arc<Background>,
20}
21
22impl TestServer {
23 pub fn create(
24 url: String,
25 api_key: String,
26 secret_key: String,
27 background: Arc<Background>,
28 ) -> Result<Arc<TestServer>> {
29 let mut servers = SERVERS.lock();
30 if servers.contains_key(&url) {
31 Err(anyhow!("a server with url {:?} already exists", url))
32 } else {
33 let server = Arc::new(TestServer {
34 url: url.clone(),
35 api_key,
36 secret_key,
37 rooms: Default::default(),
38 background,
39 });
40 servers.insert(url, server.clone());
41 Ok(server)
42 }
43 }
44
45 fn get(url: &str) -> Result<Arc<TestServer>> {
46 Ok(SERVERS
47 .lock()
48 .get(url)
49 .ok_or_else(|| anyhow!("no server found for url"))?
50 .clone())
51 }
52
53 pub fn teardown(&self) -> Result<()> {
54 SERVERS
55 .lock()
56 .remove(&self.url)
57 .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
58 Ok(())
59 }
60
61 pub fn create_api_client(&self) -> TestApiClient {
62 TestApiClient {
63 url: self.url.clone(),
64 }
65 }
66
67 pub async fn create_room(&self, room: String) -> Result<()> {
68 self.background.simulate_random_delay().await;
69 let mut server_rooms = self.rooms.lock();
70 if server_rooms.contains_key(&room) {
71 Err(anyhow!("room {:?} already exists", room))
72 } else {
73 server_rooms.insert(room, Default::default());
74 Ok(())
75 }
76 }
77
78 async fn delete_room(&self, room: String) -> Result<()> {
79 // TODO: clear state associated with all `Room`s.
80 self.background.simulate_random_delay().await;
81 let mut server_rooms = self.rooms.lock();
82 server_rooms
83 .remove(&room)
84 .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
85 Ok(())
86 }
87
88 async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
89 self.background.simulate_random_delay().await;
90 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
91 let identity = claims.sub.unwrap().to_string();
92 let room_name = claims.video.room.unwrap();
93 let mut server_rooms = self.rooms.lock();
94 let room = (*server_rooms).entry(room_name.to_string()).or_default();
95
96 if room.client_rooms.contains_key(&identity) {
97 Err(anyhow!(
98 "{:?} attempted to join room {:?} twice",
99 identity,
100 room_name
101 ))
102 } else {
103 for track in &room.video_tracks {
104 client_room
105 .0
106 .lock()
107 .video_track_updates
108 .0
109 .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
110 .unwrap();
111 }
112 room.client_rooms.insert(identity, client_room);
113 Ok(())
114 }
115 }
116
117 async fn leave_room(&self, token: String) -> Result<()> {
118 self.background.simulate_random_delay().await;
119 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
120 let identity = claims.sub.unwrap().to_string();
121 let room_name = claims.video.room.unwrap();
122 let mut server_rooms = self.rooms.lock();
123 let room = server_rooms
124 .get_mut(&*room_name)
125 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
126 room.client_rooms.remove(&identity).ok_or_else(|| {
127 anyhow!(
128 "{:?} attempted to leave room {:?} before joining it",
129 identity,
130 room_name
131 )
132 })?;
133 Ok(())
134 }
135
136 async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
137 // TODO: clear state associated with the `Room`.
138
139 self.background.simulate_random_delay().await;
140 let mut server_rooms = self.rooms.lock();
141 let room = server_rooms
142 .get_mut(&room_name)
143 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
144 room.client_rooms.remove(&identity).ok_or_else(|| {
145 anyhow!(
146 "participant {:?} did not join room {:?}",
147 identity,
148 room_name
149 )
150 })?;
151 Ok(())
152 }
153
154 pub async fn disconnect_client(&self, client_identity: String) {
155 self.background.simulate_random_delay().await;
156 let mut server_rooms = self.rooms.lock();
157 for room in server_rooms.values_mut() {
158 if let Some(room) = room.client_rooms.remove(&client_identity) {
159 *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
160 }
161 }
162 }
163
164 async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> {
165 self.background.simulate_random_delay().await;
166 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
167 let identity = claims.sub.unwrap().to_string();
168 let room_name = claims.video.room.unwrap();
169
170 let mut server_rooms = self.rooms.lock();
171 let room = server_rooms
172 .get_mut(&*room_name)
173 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
174
175 let track = Arc::new(RemoteVideoTrack {
176 sid: nanoid::nanoid!(17),
177 publisher_id: identity.clone(),
178 frames_rx: local_track.frames_rx.clone(),
179 });
180
181 room.video_tracks.push(track.clone());
182
183 for (id, client_room) in &room.client_rooms {
184 if *id != identity {
185 let _ = client_room
186 .0
187 .lock()
188 .video_track_updates
189 .0
190 .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
191 .unwrap();
192 }
193 }
194
195 Ok(())
196 }
197
198 async fn publish_audio_track(
199 &self,
200 token: String,
201 _local_track: &LocalAudioTrack,
202 ) -> Result<()> {
203 self.background.simulate_random_delay().await;
204 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
205 let identity = claims.sub.unwrap().to_string();
206 let room_name = claims.video.room.unwrap();
207
208 let mut server_rooms = self.rooms.lock();
209 let room = server_rooms
210 .get_mut(&*room_name)
211 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
212
213 let track = Arc::new(RemoteAudioTrack {
214 sid: nanoid::nanoid!(17),
215 publisher_id: identity.clone(),
216 });
217
218 let publication = Arc::new(RemoteTrackPublication);
219
220 room.audio_tracks.push(track.clone());
221
222 for (id, client_room) in &room.client_rooms {
223 if *id != identity {
224 let _ = client_room
225 .0
226 .lock()
227 .audio_track_updates
228 .0
229 .try_broadcast(RemoteAudioTrackUpdate::Subscribed(
230 track.clone(),
231 publication.clone(),
232 ))
233 .unwrap();
234 }
235 }
236
237 Ok(())
238 }
239
240 fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
241 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
242 let room_name = claims.video.room.unwrap();
243
244 let mut server_rooms = self.rooms.lock();
245 let room = server_rooms
246 .get_mut(&*room_name)
247 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
248 Ok(room.video_tracks.clone())
249 }
250
251 fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
252 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
253 let room_name = claims.video.room.unwrap();
254
255 let mut server_rooms = self.rooms.lock();
256 let room = server_rooms
257 .get_mut(&*room_name)
258 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
259 Ok(room.audio_tracks.clone())
260 }
261}
262
263#[derive(Default)]
264struct TestServerRoom {
265 client_rooms: HashMap<Sid, Arc<Room>>,
266 video_tracks: Vec<Arc<RemoteVideoTrack>>,
267 audio_tracks: Vec<Arc<RemoteAudioTrack>>,
268}
269
270impl TestServerRoom {}
271
272pub struct TestApiClient {
273 url: String,
274}
275
276#[async_trait]
277impl live_kit_server::api::Client for TestApiClient {
278 fn url(&self) -> &str {
279 &self.url
280 }
281
282 async fn create_room(&self, name: String) -> Result<()> {
283 let server = TestServer::get(&self.url)?;
284 server.create_room(name).await?;
285 Ok(())
286 }
287
288 async fn delete_room(&self, name: String) -> Result<()> {
289 let server = TestServer::get(&self.url)?;
290 server.delete_room(name).await?;
291 Ok(())
292 }
293
294 async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
295 let server = TestServer::get(&self.url)?;
296 server.remove_participant(room, identity).await?;
297 Ok(())
298 }
299
300 fn room_token(&self, room: &str, identity: &str) -> Result<String> {
301 let server = TestServer::get(&self.url)?;
302 token::create(
303 &server.api_key,
304 &server.secret_key,
305 Some(identity),
306 token::VideoGrant::to_join(room),
307 )
308 }
309
310 fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
311 let server = TestServer::get(&self.url)?;
312 token::create(
313 &server.api_key,
314 &server.secret_key,
315 Some(identity),
316 token::VideoGrant::for_guest(room),
317 )
318 }
319}
320
321pub type Sid = String;
322
323struct RoomState {
324 connection: (
325 watch::Sender<ConnectionState>,
326 watch::Receiver<ConnectionState>,
327 ),
328 display_sources: Vec<MacOSDisplay>,
329 audio_track_updates: (
330 async_broadcast::Sender<RemoteAudioTrackUpdate>,
331 async_broadcast::Receiver<RemoteAudioTrackUpdate>,
332 ),
333 video_track_updates: (
334 async_broadcast::Sender<RemoteVideoTrackUpdate>,
335 async_broadcast::Receiver<RemoteVideoTrackUpdate>,
336 ),
337}
338
339#[derive(Clone, Eq, PartialEq)]
340pub enum ConnectionState {
341 Disconnected,
342 Connected { url: String, token: String },
343}
344
345pub struct Room(Mutex<RoomState>);
346
347impl Room {
348 pub fn new() -> Arc<Self> {
349 Arc::new(Self(Mutex::new(RoomState {
350 connection: watch::channel_with(ConnectionState::Disconnected),
351 display_sources: Default::default(),
352 video_track_updates: async_broadcast::broadcast(128),
353 audio_track_updates: async_broadcast::broadcast(128),
354 })))
355 }
356
357 pub fn status(&self) -> watch::Receiver<ConnectionState> {
358 self.0.lock().connection.1.clone()
359 }
360
361 pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
362 let this = self.clone();
363 let url = url.to_string();
364 let token = token.to_string();
365 async move {
366 let server = TestServer::get(&url)?;
367 server.join_room(token.clone(), this.clone()).await?;
368 *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
369 Ok(())
370 }
371 }
372
373 pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
374 let this = self.clone();
375 async move {
376 let server = this.test_server();
377 server.background.simulate_random_delay().await;
378 Ok(this.0.lock().display_sources.clone())
379 }
380 }
381
382 pub fn publish_video_track(
383 self: &Arc<Self>,
384 track: LocalVideoTrack,
385 ) -> impl Future<Output = Result<LocalTrackPublication>> {
386 let this = self.clone();
387 let track = track.clone();
388 async move {
389 this.test_server()
390 .publish_video_track(this.token(), track)
391 .await?;
392 Ok(LocalTrackPublication)
393 }
394 }
395 pub fn publish_audio_track(
396 self: &Arc<Self>,
397 track: LocalAudioTrack,
398 ) -> impl Future<Output = Result<LocalTrackPublication>> {
399 let this = self.clone();
400 let track = track.clone();
401 async move {
402 this.test_server()
403 .publish_audio_track(this.token(), &track)
404 .await?;
405 Ok(LocalTrackPublication)
406 }
407 }
408
409 pub fn unpublish_track(&self, _publication: LocalTrackPublication) {}
410
411 pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
412 if !self.is_connected() {
413 return Vec::new();
414 }
415
416 self.test_server()
417 .audio_tracks(self.token())
418 .unwrap()
419 .into_iter()
420 .filter(|track| track.publisher_id() == publisher_id)
421 .collect()
422 }
423
424 pub fn remote_audio_track_publications(
425 &self,
426 publisher_id: &str,
427 ) -> Vec<Arc<RemoteTrackPublication>> {
428 if !self.is_connected() {
429 return Vec::new();
430 }
431
432 self.test_server()
433 .audio_tracks(self.token())
434 .unwrap()
435 .into_iter()
436 .filter(|track| track.publisher_id() == publisher_id)
437 .map(|_track| Arc::new(RemoteTrackPublication {}))
438 .collect()
439 }
440
441 pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
442 if !self.is_connected() {
443 return Vec::new();
444 }
445
446 self.test_server()
447 .video_tracks(self.token())
448 .unwrap()
449 .into_iter()
450 .filter(|track| track.publisher_id() == publisher_id)
451 .collect()
452 }
453
454 pub fn remote_audio_track_updates(&self) -> impl Stream<Item = RemoteAudioTrackUpdate> {
455 self.0.lock().audio_track_updates.1.clone()
456 }
457
458 pub fn remote_video_track_updates(&self) -> impl Stream<Item = RemoteVideoTrackUpdate> {
459 self.0.lock().video_track_updates.1.clone()
460 }
461
462 pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
463 self.0.lock().display_sources = sources;
464 }
465
466 fn test_server(&self) -> Arc<TestServer> {
467 match self.0.lock().connection.1.borrow().clone() {
468 ConnectionState::Disconnected => panic!("must be connected to call this method"),
469 ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
470 }
471 }
472
473 fn token(&self) -> String {
474 match self.0.lock().connection.1.borrow().clone() {
475 ConnectionState::Disconnected => panic!("must be connected to call this method"),
476 ConnectionState::Connected { token, .. } => token,
477 }
478 }
479
480 fn is_connected(&self) -> bool {
481 match *self.0.lock().connection.1.borrow() {
482 ConnectionState::Disconnected => false,
483 ConnectionState::Connected { .. } => true,
484 }
485 }
486}
487
488impl Drop for Room {
489 fn drop(&mut self) {
490 if let ConnectionState::Connected { token, .. } = mem::replace(
491 &mut *self.0.lock().connection.0.borrow_mut(),
492 ConnectionState::Disconnected,
493 ) {
494 if let Ok(server) = TestServer::get(&token) {
495 let background = server.background.clone();
496 background
497 .spawn(async move { server.leave_room(token).await.unwrap() })
498 .detach();
499 }
500 }
501 }
502}
503
504pub struct LocalTrackPublication;
505
506impl LocalTrackPublication {
507 pub fn set_mute(&self, _mute: bool) -> impl Future<Output = Result<()>> {
508 async { Ok(()) }
509 }
510}
511
512pub struct RemoteTrackPublication;
513
514impl RemoteTrackPublication {
515 pub fn set_enabled(&self, _enabled: bool) -> impl Future<Output = Result<()>> {
516 async { Ok(()) }
517 }
518
519 pub fn is_muted(&self) -> bool {
520 false
521 }
522
523 pub fn sid(&self) -> String {
524 "".to_string()
525 }
526}
527
528#[derive(Clone)]
529pub struct LocalVideoTrack {
530 frames_rx: async_broadcast::Receiver<Frame>,
531}
532
533impl LocalVideoTrack {
534 pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
535 Self {
536 frames_rx: display.frames.1.clone(),
537 }
538 }
539}
540
541#[derive(Clone)]
542pub struct LocalAudioTrack;
543
544impl LocalAudioTrack {
545 pub fn create() -> Self {
546 Self
547 }
548}
549
550pub struct RemoteVideoTrack {
551 sid: Sid,
552 publisher_id: Sid,
553 frames_rx: async_broadcast::Receiver<Frame>,
554}
555
556impl RemoteVideoTrack {
557 pub fn sid(&self) -> &str {
558 &self.sid
559 }
560
561 pub fn publisher_id(&self) -> &str {
562 &self.publisher_id
563 }
564
565 pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
566 self.frames_rx.clone()
567 }
568}
569
570#[derive(Debug)]
571pub struct RemoteAudioTrack {
572 sid: Sid,
573 publisher_id: Sid,
574}
575
576impl RemoteAudioTrack {
577 pub fn sid(&self) -> &str {
578 &self.sid
579 }
580
581 pub fn publisher_id(&self) -> &str {
582 &self.publisher_id
583 }
584
585 pub fn enable(&self) -> impl Future<Output = Result<()>> {
586 async { Ok(()) }
587 }
588
589 pub fn disable(&self) -> impl Future<Output = Result<()>> {
590 async { Ok(()) }
591 }
592}
593
594#[derive(Clone)]
595pub enum RemoteVideoTrackUpdate {
596 Subscribed(Arc<RemoteVideoTrack>),
597 Unsubscribed { publisher_id: Sid, track_id: Sid },
598}
599
600#[derive(Clone)]
601pub enum RemoteAudioTrackUpdate {
602 ActiveSpeakersChanged { speakers: Vec<Sid> },
603 MuteChanged { track_id: Sid, muted: bool },
604 Subscribed(Arc<RemoteAudioTrack>, Arc<RemoteTrackPublication>),
605 Unsubscribed { publisher_id: Sid, track_id: Sid },
606}
607
608#[derive(Clone)]
609pub struct MacOSDisplay {
610 frames: (
611 async_broadcast::Sender<Frame>,
612 async_broadcast::Receiver<Frame>,
613 ),
614}
615
616impl MacOSDisplay {
617 pub fn new() -> Self {
618 Self {
619 frames: async_broadcast::broadcast(128),
620 }
621 }
622
623 pub fn send_frame(&self, frame: Frame) {
624 self.frames.0.try_broadcast(frame).unwrap();
625 }
626}
627
628#[derive(Clone, Debug, PartialEq, Eq)]
629pub struct Frame {
630 pub label: String,
631 pub width: usize,
632 pub height: usize,
633}
634
635impl Frame {
636 pub fn width(&self) -> usize {
637 self.width
638 }
639
640 pub fn height(&self) -> usize {
641 self.height
642 }
643
644 pub fn image(&self) -> CVImageBuffer {
645 unimplemented!("you can't call this in test mode")
646 }
647}