1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use collections::HashMap;
4use futures::Stream;
5use gpui::executor::Background;
6use lazy_static::lazy_static;
7use live_kit_server::token;
8use media::core_video::CVImageBuffer;
9use parking_lot::Mutex;
10use postage::watch;
11use std::{future::Future, mem, sync::Arc};
12
13lazy_static! {
14 static ref SERVERS: Mutex<HashMap<String, Arc<TestServer>>> = Default::default();
15}
16
17pub struct TestServer {
18 pub url: String,
19 pub api_key: String,
20 pub secret_key: String,
21 rooms: Mutex<HashMap<String, TestServerRoom>>,
22 background: Arc<Background>,
23}
24
25impl TestServer {
26 pub fn create(
27 url: String,
28 api_key: String,
29 secret_key: String,
30 background: Arc<Background>,
31 ) -> Result<Arc<TestServer>> {
32 let mut servers = SERVERS.lock();
33 if servers.contains_key(&url) {
34 Err(anyhow!("a server with url {:?} already exists", url))
35 } else {
36 let server = Arc::new(TestServer {
37 url: url.clone(),
38 api_key,
39 secret_key,
40 rooms: Default::default(),
41 background,
42 });
43 servers.insert(url, server.clone());
44 Ok(server)
45 }
46 }
47
48 fn get(url: &str) -> Result<Arc<TestServer>> {
49 Ok(SERVERS
50 .lock()
51 .get(url)
52 .ok_or_else(|| anyhow!("no server found for url"))?
53 .clone())
54 }
55
56 pub fn teardown(&self) -> Result<()> {
57 SERVERS
58 .lock()
59 .remove(&self.url)
60 .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
61 Ok(())
62 }
63
64 pub fn create_api_client(&self) -> TestApiClient {
65 TestApiClient {
66 url: self.url.clone(),
67 }
68 }
69
70 pub async fn create_room(&self, room: String) -> Result<()> {
71 self.background.simulate_random_delay().await;
72 let mut server_rooms = self.rooms.lock();
73 if server_rooms.contains_key(&room) {
74 Err(anyhow!("room {:?} already exists", room))
75 } else {
76 server_rooms.insert(room, Default::default());
77 Ok(())
78 }
79 }
80
81 async fn delete_room(&self, room: String) -> Result<()> {
82 // TODO: clear state associated with all `Room`s.
83 self.background.simulate_random_delay().await;
84 let mut server_rooms = self.rooms.lock();
85 server_rooms
86 .remove(&room)
87 .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
88 Ok(())
89 }
90
91 async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
92 self.background.simulate_random_delay().await;
93 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
94 let identity = claims.sub.unwrap().to_string();
95 let room_name = claims.video.room.unwrap();
96 let mut server_rooms = self.rooms.lock();
97 let room = server_rooms
98 .get_mut(&*room_name)
99 .ok_or_else(|| anyhow!("room {:?} does not exist", room_name))?;
100 if room.client_rooms.contains_key(&identity) {
101 Err(anyhow!(
102 "{:?} attempted to join room {:?} twice",
103 identity,
104 room_name
105 ))
106 } else {
107 for track in &room.video_tracks {
108 client_room
109 .0
110 .lock()
111 .video_track_updates
112 .0
113 .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
114 .unwrap();
115 }
116 room.client_rooms.insert(identity, client_room);
117 Ok(())
118 }
119 }
120
121 async fn leave_room(&self, token: String) -> Result<()> {
122 self.background.simulate_random_delay().await;
123 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
124 let identity = claims.sub.unwrap().to_string();
125 let room_name = claims.video.room.unwrap();
126 let mut server_rooms = self.rooms.lock();
127 let room = server_rooms
128 .get_mut(&*room_name)
129 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
130 room.client_rooms.remove(&identity).ok_or_else(|| {
131 anyhow!(
132 "{:?} attempted to leave room {:?} before joining it",
133 identity,
134 room_name
135 )
136 })?;
137 Ok(())
138 }
139
140 async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
141 // TODO: clear state associated with the `Room`.
142
143 self.background.simulate_random_delay().await;
144 let mut server_rooms = self.rooms.lock();
145 let room = server_rooms
146 .get_mut(&room_name)
147 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
148 room.client_rooms.remove(&identity).ok_or_else(|| {
149 anyhow!(
150 "participant {:?} did not join room {:?}",
151 identity,
152 room_name
153 )
154 })?;
155 Ok(())
156 }
157
158 pub async fn disconnect_client(&self, client_identity: String) {
159 self.background.simulate_random_delay().await;
160 let mut server_rooms = self.rooms.lock();
161 for room in server_rooms.values_mut() {
162 if let Some(room) = room.client_rooms.remove(&client_identity) {
163 *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
164 }
165 }
166 }
167
168 async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> {
169 self.background.simulate_random_delay().await;
170 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
171 let identity = claims.sub.unwrap().to_string();
172 let room_name = claims.video.room.unwrap();
173
174 let mut server_rooms = self.rooms.lock();
175 let room = server_rooms
176 .get_mut(&*room_name)
177 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
178
179 let track = Arc::new(RemoteVideoTrack {
180 sid: nanoid::nanoid!(17),
181 publisher_id: identity.clone(),
182 frames_rx: local_track.frames_rx.clone(),
183 });
184
185 room.video_tracks.push(track.clone());
186
187 for (id, client_room) in &room.client_rooms {
188 if *id != identity {
189 let _ = client_room
190 .0
191 .lock()
192 .video_track_updates
193 .0
194 .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
195 .unwrap();
196 }
197 }
198
199 Ok(())
200 }
201
202 async fn publish_audio_track(
203 &self,
204 token: String,
205 _local_track: &LocalAudioTrack,
206 ) -> Result<()> {
207 self.background.simulate_random_delay().await;
208 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
209 let identity = claims.sub.unwrap().to_string();
210 let room_name = claims.video.room.unwrap();
211
212 let mut server_rooms = self.rooms.lock();
213 let room = server_rooms
214 .get_mut(&*room_name)
215 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
216
217 let track = Arc::new(RemoteAudioTrack {
218 sid: nanoid::nanoid!(17),
219 publisher_id: identity.clone(),
220 });
221
222 room.audio_tracks.push(track.clone());
223
224 for (id, client_room) in &room.client_rooms {
225 if *id != identity {
226 let _ = client_room
227 .0
228 .lock()
229 .audio_track_updates
230 .0
231 .try_broadcast(RemoteAudioTrackUpdate::Subscribed(track.clone()))
232 .unwrap();
233 }
234 }
235
236 Ok(())
237 }
238
239 fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
240 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
241 let room_name = claims.video.room.unwrap();
242
243 let mut server_rooms = self.rooms.lock();
244 let room = server_rooms
245 .get_mut(&*room_name)
246 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
247 Ok(room.video_tracks.clone())
248 }
249
250 fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
251 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
252 let room_name = claims.video.room.unwrap();
253
254 let mut server_rooms = self.rooms.lock();
255 let room = server_rooms
256 .get_mut(&*room_name)
257 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
258 Ok(room.audio_tracks.clone())
259 }
260}
261
262#[derive(Default)]
263struct TestServerRoom {
264 client_rooms: HashMap<Sid, Arc<Room>>,
265 video_tracks: Vec<Arc<RemoteVideoTrack>>,
266 audio_tracks: Vec<Arc<RemoteAudioTrack>>,
267}
268
269impl TestServerRoom {}
270
271pub struct TestApiClient {
272 url: String,
273}
274
275#[async_trait]
276impl live_kit_server::api::Client for TestApiClient {
277 fn url(&self) -> &str {
278 &self.url
279 }
280
281 async fn create_room(&self, name: String) -> Result<()> {
282 let server = TestServer::get(&self.url)?;
283 server.create_room(name).await?;
284 Ok(())
285 }
286
287 async fn delete_room(&self, name: String) -> Result<()> {
288 let server = TestServer::get(&self.url)?;
289 server.delete_room(name).await?;
290 Ok(())
291 }
292
293 async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
294 let server = TestServer::get(&self.url)?;
295 server.remove_participant(room, identity).await?;
296 Ok(())
297 }
298
299 fn room_token(&self, room: &str, identity: &str) -> Result<String> {
300 let server = TestServer::get(&self.url)?;
301 token::create(
302 &server.api_key,
303 &server.secret_key,
304 Some(identity),
305 token::VideoGrant::to_join(room),
306 )
307 }
308}
309
310pub type Sid = String;
311
312struct RoomState {
313 connection: (
314 watch::Sender<ConnectionState>,
315 watch::Receiver<ConnectionState>,
316 ),
317 display_sources: Vec<MacOSDisplay>,
318 audio_track_updates: (
319 async_broadcast::Sender<RemoteAudioTrackUpdate>,
320 async_broadcast::Receiver<RemoteAudioTrackUpdate>,
321 ),
322 video_track_updates: (
323 async_broadcast::Sender<RemoteVideoTrackUpdate>,
324 async_broadcast::Receiver<RemoteVideoTrackUpdate>,
325 ),
326}
327
328#[derive(Clone, Eq, PartialEq)]
329pub enum ConnectionState {
330 Disconnected,
331 Connected { url: String, token: String },
332}
333
334pub struct Room(Mutex<RoomState>);
335
336impl Room {
337 pub fn new() -> Arc<Self> {
338 Arc::new(Self(Mutex::new(RoomState {
339 connection: watch::channel_with(ConnectionState::Disconnected),
340 display_sources: Default::default(),
341 video_track_updates: async_broadcast::broadcast(128),
342 audio_track_updates: async_broadcast::broadcast(128),
343 })))
344 }
345
346 pub fn status(&self) -> watch::Receiver<ConnectionState> {
347 self.0.lock().connection.1.clone()
348 }
349
350 pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
351 let this = self.clone();
352 let url = url.to_string();
353 let token = token.to_string();
354 async move {
355 let server = TestServer::get(&url)?;
356 server.join_room(token.clone(), this.clone()).await?;
357 *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
358 Ok(())
359 }
360 }
361
362 pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
363 let this = self.clone();
364 async move {
365 let server = this.test_server();
366 server.background.simulate_random_delay().await;
367 Ok(this.0.lock().display_sources.clone())
368 }
369 }
370
371 pub fn publish_video_track(
372 self: &Arc<Self>,
373 track: &LocalVideoTrack,
374 ) -> impl Future<Output = Result<LocalTrackPublication>> {
375 let this = self.clone();
376 let track = track.clone();
377 async move {
378 this.test_server()
379 .publish_video_track(this.token(), track)
380 .await?;
381 Ok(LocalTrackPublication)
382 }
383 }
384 pub fn publish_audio_track(
385 self: &Arc<Self>,
386 track: &LocalAudioTrack,
387 ) -> impl Future<Output = Result<LocalTrackPublication>> {
388 let this = self.clone();
389 let track = track.clone();
390 async move {
391 this.test_server()
392 .publish_audio_track(this.token(), &track)
393 .await?;
394 Ok(LocalTrackPublication)
395 }
396 }
397
398 pub fn unpublish_track(&self, _publication: LocalTrackPublication) {}
399
400 pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
401 if !self.is_connected() {
402 return Vec::new();
403 }
404
405 self.test_server()
406 .audio_tracks(self.token())
407 .unwrap()
408 .into_iter()
409 .filter(|track| track.publisher_id() == publisher_id)
410 .collect()
411 }
412
413 pub fn remote_audio_track_publications(
414 &self,
415 publisher_id: &str,
416 ) -> Vec<Arc<RemoteTrackPublication>> {
417 if !self.is_connected() {
418 return Vec::new();
419 }
420
421 self.test_server()
422 .audio_tracks(self.token())
423 .unwrap()
424 .into_iter()
425 .filter(|track| track.publisher_id() == publisher_id)
426 .map(|_track| Arc::new(RemoteTrackPublication {}))
427 .collect()
428 }
429
430 pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
431 if !self.is_connected() {
432 return Vec::new();
433 }
434
435 self.test_server()
436 .video_tracks(self.token())
437 .unwrap()
438 .into_iter()
439 .filter(|track| track.publisher_id() == publisher_id)
440 .collect()
441 }
442
443 pub fn remote_audio_track_updates(&self) -> impl Stream<Item = RemoteAudioTrackUpdate> {
444 self.0.lock().audio_track_updates.1.clone()
445 }
446
447 pub fn remote_video_track_updates(&self) -> impl Stream<Item = RemoteVideoTrackUpdate> {
448 self.0.lock().video_track_updates.1.clone()
449 }
450
451 pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
452 self.0.lock().display_sources = sources;
453 }
454
455 fn test_server(&self) -> Arc<TestServer> {
456 match self.0.lock().connection.1.borrow().clone() {
457 ConnectionState::Disconnected => panic!("must be connected to call this method"),
458 ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
459 }
460 }
461
462 fn token(&self) -> String {
463 match self.0.lock().connection.1.borrow().clone() {
464 ConnectionState::Disconnected => panic!("must be connected to call this method"),
465 ConnectionState::Connected { token, .. } => token,
466 }
467 }
468
469 fn is_connected(&self) -> bool {
470 match *self.0.lock().connection.1.borrow() {
471 ConnectionState::Disconnected => false,
472 ConnectionState::Connected { .. } => true,
473 }
474 }
475}
476
477impl Drop for Room {
478 fn drop(&mut self) {
479 if let ConnectionState::Connected { token, .. } = mem::replace(
480 &mut *self.0.lock().connection.0.borrow_mut(),
481 ConnectionState::Disconnected,
482 ) {
483 if let Ok(server) = TestServer::get(&token) {
484 let background = server.background.clone();
485 background
486 .spawn(async move { server.leave_room(token).await.unwrap() })
487 .detach();
488 }
489 }
490 }
491}
492
493pub struct LocalTrackPublication;
494
495impl LocalTrackPublication {
496 pub fn set_mute(&self, _mute: bool) -> impl Future<Output = Result<()>> {
497 async { Ok(()) }
498 }
499}
500
501pub struct RemoteTrackPublication;
502
503impl RemoteTrackPublication {
504 pub fn set_enabled(&self, _enabled: bool) -> impl Future<Output = Result<()>> {
505 async { Ok(()) }
506 }
507}
508
509#[derive(Clone)]
510pub struct LocalVideoTrack {
511 frames_rx: async_broadcast::Receiver<Frame>,
512}
513
514impl LocalVideoTrack {
515 pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
516 Self {
517 frames_rx: display.frames.1.clone(),
518 }
519 }
520}
521
522#[derive(Clone)]
523pub struct LocalAudioTrack;
524
525impl LocalAudioTrack {
526 pub fn create() -> Self {
527 Self
528 }
529}
530
531pub struct RemoteVideoTrack {
532 sid: Sid,
533 publisher_id: Sid,
534 frames_rx: async_broadcast::Receiver<Frame>,
535}
536
537impl RemoteVideoTrack {
538 pub fn sid(&self) -> &str {
539 &self.sid
540 }
541
542 pub fn publisher_id(&self) -> &str {
543 &self.publisher_id
544 }
545
546 pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
547 self.frames_rx.clone()
548 }
549}
550
551#[derive(Debug)]
552pub struct RemoteAudioTrack {
553 sid: Sid,
554 publisher_id: Sid,
555}
556
557impl RemoteAudioTrack {
558 pub fn sid(&self) -> &str {
559 &self.sid
560 }
561
562 pub fn publisher_id(&self) -> &str {
563 &self.publisher_id
564 }
565
566 pub fn enable(&self) -> impl Future<Output = Result<()>> {
567 async { Ok(()) }
568 }
569
570 pub fn disable(&self) -> impl Future<Output = Result<()>> {
571 async { Ok(()) }
572 }
573}
574
575#[derive(Clone)]
576pub enum RemoteVideoTrackUpdate {
577 Subscribed(Arc<RemoteVideoTrack>),
578 Unsubscribed { publisher_id: Sid, track_id: Sid },
579}
580
581#[derive(Clone)]
582pub enum RemoteAudioTrackUpdate {
583 ActiveSpeakersChanged { speakers: Vec<Sid> },
584 MuteChanged { track_id: Sid, muted: bool },
585 Subscribed(Arc<RemoteAudioTrack>),
586 Unsubscribed { publisher_id: Sid, track_id: Sid },
587}
588
589#[derive(Clone)]
590pub struct MacOSDisplay {
591 frames: (
592 async_broadcast::Sender<Frame>,
593 async_broadcast::Receiver<Frame>,
594 ),
595}
596
597impl MacOSDisplay {
598 pub fn new() -> Self {
599 Self {
600 frames: async_broadcast::broadcast(128),
601 }
602 }
603
604 pub fn send_frame(&self, frame: Frame) {
605 self.frames.0.try_broadcast(frame).unwrap();
606 }
607}
608
609#[derive(Clone, Debug, PartialEq, Eq)]
610pub struct Frame {
611 pub label: String,
612 pub width: usize,
613 pub height: usize,
614}
615
616impl Frame {
617 pub fn width(&self) -> usize {
618 self.width
619 }
620
621 pub fn height(&self) -> usize {
622 self.height
623 }
624
625 pub fn image(&self) -> CVImageBuffer {
626 unimplemented!("you can't call this in test mode")
627 }
628}