1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use collections::{BTreeMap, HashMap};
4use futures::Stream;
5use gpui::executor::Background;
6use live_kit_server::token;
7use media::core_video::CVImageBuffer;
8use parking_lot::Mutex;
9use postage::watch;
10use std::{future::Future, mem, sync::Arc};
11
12static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
13
14pub struct TestServer {
15 pub url: String,
16 pub api_key: String,
17 pub secret_key: String,
18 rooms: Mutex<HashMap<String, TestServerRoom>>,
19 background: Arc<Background>,
20}
21
22impl TestServer {
23 pub fn create(
24 url: String,
25 api_key: String,
26 secret_key: String,
27 background: Arc<Background>,
28 ) -> Result<Arc<TestServer>> {
29 let mut servers = SERVERS.lock();
30 if servers.contains_key(&url) {
31 Err(anyhow!("a server with url {:?} already exists", url))
32 } else {
33 let server = Arc::new(TestServer {
34 url: url.clone(),
35 api_key,
36 secret_key,
37 rooms: Default::default(),
38 background,
39 });
40 servers.insert(url, server.clone());
41 Ok(server)
42 }
43 }
44
45 fn get(url: &str) -> Result<Arc<TestServer>> {
46 Ok(SERVERS
47 .lock()
48 .get(url)
49 .ok_or_else(|| anyhow!("no server found for url"))?
50 .clone())
51 }
52
53 pub fn teardown(&self) -> Result<()> {
54 SERVERS
55 .lock()
56 .remove(&self.url)
57 .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
58 Ok(())
59 }
60
61 pub fn create_api_client(&self) -> TestApiClient {
62 TestApiClient {
63 url: self.url.clone(),
64 }
65 }
66
67 pub async fn create_room(&self, room: String) -> Result<()> {
68 self.background.simulate_random_delay().await;
69 let mut server_rooms = self.rooms.lock();
70 if server_rooms.contains_key(&room) {
71 Err(anyhow!("room {:?} already exists", room))
72 } else {
73 server_rooms.insert(room, Default::default());
74 Ok(())
75 }
76 }
77
78 async fn delete_room(&self, room: String) -> Result<()> {
79 // TODO: clear state associated with all `Room`s.
80 self.background.simulate_random_delay().await;
81 let mut server_rooms = self.rooms.lock();
82 server_rooms
83 .remove(&room)
84 .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
85 Ok(())
86 }
87
88 async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
89 self.background.simulate_random_delay().await;
90 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
91 let identity = claims.sub.unwrap().to_string();
92 let room_name = claims.video.room.unwrap();
93 let mut server_rooms = self.rooms.lock();
94 let room = server_rooms
95 .get_mut(&*room_name)
96 .ok_or_else(|| anyhow!("room {:?} does not exist", room_name))?;
97 if room.client_rooms.contains_key(&identity) {
98 Err(anyhow!(
99 "{:?} attempted to join room {:?} twice",
100 identity,
101 room_name
102 ))
103 } else {
104 for track in &room.video_tracks {
105 client_room
106 .0
107 .lock()
108 .video_track_updates
109 .0
110 .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
111 .unwrap();
112 }
113 room.client_rooms.insert(identity, client_room);
114 Ok(())
115 }
116 }
117
118 async fn leave_room(&self, token: String) -> Result<()> {
119 self.background.simulate_random_delay().await;
120 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
121 let identity = claims.sub.unwrap().to_string();
122 let room_name = claims.video.room.unwrap();
123 let mut server_rooms = self.rooms.lock();
124 let room = server_rooms
125 .get_mut(&*room_name)
126 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
127 room.client_rooms.remove(&identity).ok_or_else(|| {
128 anyhow!(
129 "{:?} attempted to leave room {:?} before joining it",
130 identity,
131 room_name
132 )
133 })?;
134 Ok(())
135 }
136
137 async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
138 // TODO: clear state associated with the `Room`.
139
140 self.background.simulate_random_delay().await;
141 let mut server_rooms = self.rooms.lock();
142 let room = server_rooms
143 .get_mut(&room_name)
144 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
145 room.client_rooms.remove(&identity).ok_or_else(|| {
146 anyhow!(
147 "participant {:?} did not join room {:?}",
148 identity,
149 room_name
150 )
151 })?;
152 Ok(())
153 }
154
155 pub async fn disconnect_client(&self, client_identity: String) {
156 self.background.simulate_random_delay().await;
157 let mut server_rooms = self.rooms.lock();
158 for room in server_rooms.values_mut() {
159 if let Some(room) = room.client_rooms.remove(&client_identity) {
160 *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
161 }
162 }
163 }
164
165 async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> {
166 self.background.simulate_random_delay().await;
167 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
168 let identity = claims.sub.unwrap().to_string();
169 let room_name = claims.video.room.unwrap();
170
171 let mut server_rooms = self.rooms.lock();
172 let room = server_rooms
173 .get_mut(&*room_name)
174 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
175
176 let track = Arc::new(RemoteVideoTrack {
177 sid: nanoid::nanoid!(17),
178 publisher_id: identity.clone(),
179 frames_rx: local_track.frames_rx.clone(),
180 });
181
182 room.video_tracks.push(track.clone());
183
184 for (id, client_room) in &room.client_rooms {
185 if *id != identity {
186 let _ = client_room
187 .0
188 .lock()
189 .video_track_updates
190 .0
191 .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
192 .unwrap();
193 }
194 }
195
196 Ok(())
197 }
198
199 async fn publish_audio_track(
200 &self,
201 token: String,
202 _local_track: &LocalAudioTrack,
203 ) -> Result<()> {
204 self.background.simulate_random_delay().await;
205 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
206 let identity = claims.sub.unwrap().to_string();
207 let room_name = claims.video.room.unwrap();
208
209 let mut server_rooms = self.rooms.lock();
210 let room = server_rooms
211 .get_mut(&*room_name)
212 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
213
214 let track = Arc::new(RemoteAudioTrack {
215 sid: nanoid::nanoid!(17),
216 publisher_id: identity.clone(),
217 });
218
219 let publication = Arc::new(RemoteTrackPublication);
220
221 room.audio_tracks.push(track.clone());
222
223 for (id, client_room) in &room.client_rooms {
224 if *id != identity {
225 let _ = client_room
226 .0
227 .lock()
228 .audio_track_updates
229 .0
230 .try_broadcast(RemoteAudioTrackUpdate::Subscribed(
231 track.clone(),
232 publication.clone(),
233 ))
234 .unwrap();
235 }
236 }
237
238 Ok(())
239 }
240
241 fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
242 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
243 let room_name = claims.video.room.unwrap();
244
245 let mut server_rooms = self.rooms.lock();
246 let room = server_rooms
247 .get_mut(&*room_name)
248 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
249 Ok(room.video_tracks.clone())
250 }
251
252 fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
253 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
254 let room_name = claims.video.room.unwrap();
255
256 let mut server_rooms = self.rooms.lock();
257 let room = server_rooms
258 .get_mut(&*room_name)
259 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
260 Ok(room.audio_tracks.clone())
261 }
262}
263
264#[derive(Default)]
265struct TestServerRoom {
266 client_rooms: HashMap<Sid, Arc<Room>>,
267 video_tracks: Vec<Arc<RemoteVideoTrack>>,
268 audio_tracks: Vec<Arc<RemoteAudioTrack>>,
269}
270
271impl TestServerRoom {}
272
273pub struct TestApiClient {
274 url: String,
275}
276
277#[async_trait]
278impl live_kit_server::api::Client for TestApiClient {
279 fn url(&self) -> &str {
280 &self.url
281 }
282
283 async fn create_room(&self, name: String) -> Result<()> {
284 let server = TestServer::get(&self.url)?;
285 server.create_room(name).await?;
286 Ok(())
287 }
288
289 async fn delete_room(&self, name: String) -> Result<()> {
290 let server = TestServer::get(&self.url)?;
291 server.delete_room(name).await?;
292 Ok(())
293 }
294
295 async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
296 let server = TestServer::get(&self.url)?;
297 server.remove_participant(room, identity).await?;
298 Ok(())
299 }
300
301 fn room_token(&self, room: &str, identity: &str) -> Result<String> {
302 let server = TestServer::get(&self.url)?;
303 token::create(
304 &server.api_key,
305 &server.secret_key,
306 Some(identity),
307 token::VideoGrant::to_join(room),
308 )
309 }
310}
311
312pub type Sid = String;
313
314struct RoomState {
315 connection: (
316 watch::Sender<ConnectionState>,
317 watch::Receiver<ConnectionState>,
318 ),
319 display_sources: Vec<MacOSDisplay>,
320 audio_track_updates: (
321 async_broadcast::Sender<RemoteAudioTrackUpdate>,
322 async_broadcast::Receiver<RemoteAudioTrackUpdate>,
323 ),
324 video_track_updates: (
325 async_broadcast::Sender<RemoteVideoTrackUpdate>,
326 async_broadcast::Receiver<RemoteVideoTrackUpdate>,
327 ),
328}
329
330#[derive(Clone, Eq, PartialEq)]
331pub enum ConnectionState {
332 Disconnected,
333 Connected { url: String, token: String },
334}
335
336pub struct Room(Mutex<RoomState>);
337
338impl Room {
339 pub fn new() -> Arc<Self> {
340 Arc::new(Self(Mutex::new(RoomState {
341 connection: watch::channel_with(ConnectionState::Disconnected),
342 display_sources: Default::default(),
343 video_track_updates: async_broadcast::broadcast(128),
344 audio_track_updates: async_broadcast::broadcast(128),
345 })))
346 }
347
348 pub fn status(&self) -> watch::Receiver<ConnectionState> {
349 self.0.lock().connection.1.clone()
350 }
351
352 pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
353 let this = self.clone();
354 let url = url.to_string();
355 let token = token.to_string();
356 async move {
357 let server = TestServer::get(&url)?;
358 server.join_room(token.clone(), this.clone()).await?;
359 *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
360 Ok(())
361 }
362 }
363
364 pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
365 let this = self.clone();
366 async move {
367 let server = this.test_server();
368 server.background.simulate_random_delay().await;
369 Ok(this.0.lock().display_sources.clone())
370 }
371 }
372
373 pub fn publish_video_track(
374 self: &Arc<Self>,
375 track: &LocalVideoTrack,
376 ) -> impl Future<Output = Result<LocalTrackPublication>> {
377 let this = self.clone();
378 let track = track.clone();
379 async move {
380 this.test_server()
381 .publish_video_track(this.token(), track)
382 .await?;
383 Ok(LocalTrackPublication)
384 }
385 }
386 pub fn publish_audio_track(
387 self: &Arc<Self>,
388 track: &LocalAudioTrack,
389 ) -> impl Future<Output = Result<LocalTrackPublication>> {
390 let this = self.clone();
391 let track = track.clone();
392 async move {
393 this.test_server()
394 .publish_audio_track(this.token(), &track)
395 .await?;
396 Ok(LocalTrackPublication)
397 }
398 }
399
400 pub fn unpublish_track(&self, _publication: LocalTrackPublication) {}
401
402 pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
403 if !self.is_connected() {
404 return Vec::new();
405 }
406
407 self.test_server()
408 .audio_tracks(self.token())
409 .unwrap()
410 .into_iter()
411 .filter(|track| track.publisher_id() == publisher_id)
412 .collect()
413 }
414
415 pub fn remote_audio_track_publications(
416 &self,
417 publisher_id: &str,
418 ) -> Vec<Arc<RemoteTrackPublication>> {
419 if !self.is_connected() {
420 return Vec::new();
421 }
422
423 self.test_server()
424 .audio_tracks(self.token())
425 .unwrap()
426 .into_iter()
427 .filter(|track| track.publisher_id() == publisher_id)
428 .map(|_track| Arc::new(RemoteTrackPublication {}))
429 .collect()
430 }
431
432 pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
433 if !self.is_connected() {
434 return Vec::new();
435 }
436
437 self.test_server()
438 .video_tracks(self.token())
439 .unwrap()
440 .into_iter()
441 .filter(|track| track.publisher_id() == publisher_id)
442 .collect()
443 }
444
445 pub fn remote_audio_track_updates(&self) -> impl Stream<Item = RemoteAudioTrackUpdate> {
446 self.0.lock().audio_track_updates.1.clone()
447 }
448
449 pub fn remote_video_track_updates(&self) -> impl Stream<Item = RemoteVideoTrackUpdate> {
450 self.0.lock().video_track_updates.1.clone()
451 }
452
453 pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
454 self.0.lock().display_sources = sources;
455 }
456
457 fn test_server(&self) -> Arc<TestServer> {
458 match self.0.lock().connection.1.borrow().clone() {
459 ConnectionState::Disconnected => panic!("must be connected to call this method"),
460 ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
461 }
462 }
463
464 fn token(&self) -> String {
465 match self.0.lock().connection.1.borrow().clone() {
466 ConnectionState::Disconnected => panic!("must be connected to call this method"),
467 ConnectionState::Connected { token, .. } => token,
468 }
469 }
470
471 fn is_connected(&self) -> bool {
472 match *self.0.lock().connection.1.borrow() {
473 ConnectionState::Disconnected => false,
474 ConnectionState::Connected { .. } => true,
475 }
476 }
477}
478
479impl Drop for Room {
480 fn drop(&mut self) {
481 if let ConnectionState::Connected { token, .. } = mem::replace(
482 &mut *self.0.lock().connection.0.borrow_mut(),
483 ConnectionState::Disconnected,
484 ) {
485 if let Ok(server) = TestServer::get(&token) {
486 let background = server.background.clone();
487 background
488 .spawn(async move { server.leave_room(token).await.unwrap() })
489 .detach();
490 }
491 }
492 }
493}
494
495pub struct LocalTrackPublication;
496
497impl LocalTrackPublication {
498 pub fn set_mute(&self, _mute: bool) -> impl Future<Output = Result<()>> {
499 async { Ok(()) }
500 }
501}
502
503pub struct RemoteTrackPublication;
504
505impl RemoteTrackPublication {
506 pub fn set_enabled(&self, _enabled: bool) -> impl Future<Output = Result<()>> {
507 async { Ok(()) }
508 }
509
510 pub fn is_muted(&self) -> bool {
511 false
512 }
513
514 pub fn sid(&self) -> String {
515 "".to_string()
516 }
517}
518
519#[derive(Clone)]
520pub struct LocalVideoTrack {
521 frames_rx: async_broadcast::Receiver<Frame>,
522}
523
524impl LocalVideoTrack {
525 pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
526 Self {
527 frames_rx: display.frames.1.clone(),
528 }
529 }
530}
531
532#[derive(Clone)]
533pub struct LocalAudioTrack;
534
535impl LocalAudioTrack {
536 pub fn create() -> Self {
537 Self
538 }
539}
540
541pub struct RemoteVideoTrack {
542 sid: Sid,
543 publisher_id: Sid,
544 frames_rx: async_broadcast::Receiver<Frame>,
545}
546
547impl RemoteVideoTrack {
548 pub fn sid(&self) -> &str {
549 &self.sid
550 }
551
552 pub fn publisher_id(&self) -> &str {
553 &self.publisher_id
554 }
555
556 pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
557 self.frames_rx.clone()
558 }
559}
560
561#[derive(Debug)]
562pub struct RemoteAudioTrack {
563 sid: Sid,
564 publisher_id: Sid,
565}
566
567impl RemoteAudioTrack {
568 pub fn sid(&self) -> &str {
569 &self.sid
570 }
571
572 pub fn publisher_id(&self) -> &str {
573 &self.publisher_id
574 }
575
576 pub fn enable(&self) -> impl Future<Output = Result<()>> {
577 async { Ok(()) }
578 }
579
580 pub fn disable(&self) -> impl Future<Output = Result<()>> {
581 async { Ok(()) }
582 }
583}
584
585#[derive(Clone)]
586pub enum RemoteVideoTrackUpdate {
587 Subscribed(Arc<RemoteVideoTrack>),
588 Unsubscribed { publisher_id: Sid, track_id: Sid },
589}
590
591#[derive(Clone)]
592pub enum RemoteAudioTrackUpdate {
593 ActiveSpeakersChanged { speakers: Vec<Sid> },
594 MuteChanged { track_id: Sid, muted: bool },
595 Subscribed(Arc<RemoteAudioTrack>, Arc<RemoteTrackPublication>),
596 Unsubscribed { publisher_id: Sid, track_id: Sid },
597}
598
599#[derive(Clone)]
600pub struct MacOSDisplay {
601 frames: (
602 async_broadcast::Sender<Frame>,
603 async_broadcast::Receiver<Frame>,
604 ),
605}
606
607impl MacOSDisplay {
608 pub fn new() -> Self {
609 Self {
610 frames: async_broadcast::broadcast(128),
611 }
612 }
613
614 pub fn send_frame(&self, frame: Frame) {
615 self.frames.0.try_broadcast(frame).unwrap();
616 }
617}
618
619#[derive(Clone, Debug, PartialEq, Eq)]
620pub struct Frame {
621 pub label: String,
622 pub width: usize,
623 pub height: usize,
624}
625
626impl Frame {
627 pub fn width(&self) -> usize {
628 self.width
629 }
630
631 pub fn height(&self) -> usize {
632 self.height
633 }
634
635 pub fn image(&self) -> CVImageBuffer {
636 unimplemented!("you can't call this in test mode")
637 }
638}