1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use collections::HashMap;
4use futures::Stream;
5use gpui::executor::Background;
6use lazy_static::lazy_static;
7use live_kit_server::token;
8use media::core_video::CVImageBuffer;
9use parking_lot::Mutex;
10use postage::watch;
11use std::{future::Future, mem, sync::Arc};
12
13lazy_static! {
14 static ref SERVERS: Mutex<HashMap<String, Arc<TestServer>>> = Default::default();
15}
16
17pub struct TestServer {
18 pub url: String,
19 pub api_key: String,
20 pub secret_key: String,
21 rooms: Mutex<HashMap<String, TestServerRoom>>,
22 background: Arc<Background>,
23}
24
25impl TestServer {
26 pub fn create(
27 url: String,
28 api_key: String,
29 secret_key: String,
30 background: Arc<Background>,
31 ) -> Result<Arc<TestServer>> {
32 let mut servers = SERVERS.lock();
33 if servers.contains_key(&url) {
34 Err(anyhow!("a server with url {:?} already exists", url))
35 } else {
36 let server = Arc::new(TestServer {
37 url: url.clone(),
38 api_key,
39 secret_key,
40 rooms: Default::default(),
41 background,
42 });
43 servers.insert(url, server.clone());
44 Ok(server)
45 }
46 }
47
48 fn get(url: &str) -> Result<Arc<TestServer>> {
49 Ok(SERVERS
50 .lock()
51 .get(url)
52 .ok_or_else(|| anyhow!("no server found for url"))?
53 .clone())
54 }
55
56 pub fn teardown(&self) -> Result<()> {
57 SERVERS
58 .lock()
59 .remove(&self.url)
60 .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
61 Ok(())
62 }
63
64 pub fn create_api_client(&self) -> TestApiClient {
65 TestApiClient {
66 url: self.url.clone(),
67 }
68 }
69
70 async fn create_room(&self, room: String) -> Result<()> {
71 self.background.simulate_random_delay().await;
72 let mut server_rooms = self.rooms.lock();
73 if server_rooms.contains_key(&room) {
74 Err(anyhow!("room {:?} already exists", room))
75 } else {
76 server_rooms.insert(room, Default::default());
77 Ok(())
78 }
79 }
80
81 async fn delete_room(&self, room: String) -> Result<()> {
82 // TODO: clear state associated with all `Room`s.
83 self.background.simulate_random_delay().await;
84 let mut server_rooms = self.rooms.lock();
85 server_rooms
86 .remove(&room)
87 .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
88 Ok(())
89 }
90
91 async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
92 self.background.simulate_random_delay().await;
93 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
94 let identity = claims.sub.unwrap().to_string();
95 let room_name = claims.video.room.unwrap();
96 let mut server_rooms = self.rooms.lock();
97 let room = server_rooms
98 .get_mut(&*room_name)
99 .ok_or_else(|| anyhow!("room {:?} does not exist", room_name))?;
100 if room.client_rooms.contains_key(&identity) {
101 Err(anyhow!(
102 "{:?} attempted to join room {:?} twice",
103 identity,
104 room_name
105 ))
106 } else {
107 room.client_rooms.insert(identity, client_room);
108 Ok(())
109 }
110 }
111
112 async fn leave_room(&self, token: String) -> Result<()> {
113 self.background.simulate_random_delay().await;
114 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
115 let identity = claims.sub.unwrap().to_string();
116 let room_name = claims.video.room.unwrap();
117 let mut server_rooms = self.rooms.lock();
118 let room = server_rooms
119 .get_mut(&*room_name)
120 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
121 room.client_rooms.remove(&identity).ok_or_else(|| {
122 anyhow!(
123 "{:?} attempted to leave room {:?} before joining it",
124 identity,
125 room_name
126 )
127 })?;
128 Ok(())
129 }
130
131 async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
132 // TODO: clear state associated with the `Room`.
133
134 self.background.simulate_random_delay().await;
135 let mut server_rooms = self.rooms.lock();
136 let room = server_rooms
137 .get_mut(&room_name)
138 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
139 room.client_rooms.remove(&identity).ok_or_else(|| {
140 anyhow!(
141 "participant {:?} did not join room {:?}",
142 identity,
143 room_name
144 )
145 })?;
146 Ok(())
147 }
148
149 pub async fn disconnect_client(&self, client_identity: String) {
150 self.background.simulate_random_delay().await;
151 let mut server_rooms = self.rooms.lock();
152 for room in server_rooms.values_mut() {
153 if let Some(room) = room.client_rooms.remove(&client_identity) {
154 *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
155 }
156 }
157 }
158
159 async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> {
160 self.background.simulate_random_delay().await;
161 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
162 let identity = claims.sub.unwrap().to_string();
163 let room_name = claims.video.room.unwrap();
164
165 let mut server_rooms = self.rooms.lock();
166 let room = server_rooms
167 .get_mut(&*room_name)
168 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
169
170 let update = RemoteVideoTrackUpdate::Subscribed(Arc::new(RemoteVideoTrack {
171 sid: nanoid::nanoid!(17),
172 publisher_id: identity.clone(),
173 frames_rx: local_track.frames_rx.clone(),
174 }));
175
176 for (id, client_room) in &room.client_rooms {
177 if *id != identity {
178 let _ = client_room
179 .0
180 .lock()
181 .video_track_updates
182 .0
183 .try_broadcast(update.clone())
184 .unwrap();
185 }
186 }
187
188 Ok(())
189 }
190}
191
192#[derive(Default)]
193struct TestServerRoom {
194 client_rooms: HashMap<Sid, Arc<Room>>,
195}
196
197impl TestServerRoom {}
198
199pub struct TestApiClient {
200 url: String,
201}
202
203#[async_trait]
204impl live_kit_server::api::Client for TestApiClient {
205 fn url(&self) -> &str {
206 &self.url
207 }
208
209 async fn create_room(&self, name: String) -> Result<()> {
210 let server = TestServer::get(&self.url)?;
211 server.create_room(name).await?;
212 Ok(())
213 }
214
215 async fn delete_room(&self, name: String) -> Result<()> {
216 let server = TestServer::get(&self.url)?;
217 server.delete_room(name).await?;
218 Ok(())
219 }
220
221 async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
222 let server = TestServer::get(&self.url)?;
223 server.remove_participant(room, identity).await?;
224 Ok(())
225 }
226
227 fn room_token(&self, room: &str, identity: &str) -> Result<String> {
228 let server = TestServer::get(&self.url)?;
229 token::create(
230 &server.api_key,
231 &server.secret_key,
232 Some(identity),
233 token::VideoGrant::to_join(room),
234 )
235 }
236}
237
238pub type Sid = String;
239
240struct RoomState {
241 connection: (
242 watch::Sender<ConnectionState>,
243 watch::Receiver<ConnectionState>,
244 ),
245 display_sources: Vec<MacOSDisplay>,
246 video_track_updates: (
247 async_broadcast::Sender<RemoteVideoTrackUpdate>,
248 async_broadcast::Receiver<RemoteVideoTrackUpdate>,
249 ),
250}
251
252#[derive(Clone, Eq, PartialEq)]
253pub enum ConnectionState {
254 Disconnected,
255 Connected { url: String, token: String },
256}
257
258pub struct Room(Mutex<RoomState>);
259
260impl Room {
261 pub fn new() -> Arc<Self> {
262 Arc::new(Self(Mutex::new(RoomState {
263 connection: watch::channel_with(ConnectionState::Disconnected),
264 display_sources: Default::default(),
265 video_track_updates: async_broadcast::broadcast(128),
266 })))
267 }
268
269 pub fn status(&self) -> watch::Receiver<ConnectionState> {
270 self.0.lock().connection.1.clone()
271 }
272
273 pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
274 let this = self.clone();
275 let url = url.to_string();
276 let token = token.to_string();
277 async move {
278 let server = TestServer::get(&url)?;
279 server.join_room(token.clone(), this.clone()).await?;
280 *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
281 Ok(())
282 }
283 }
284
285 pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
286 let this = self.clone();
287 async move {
288 let server = this.test_server();
289 server.background.simulate_random_delay().await;
290 Ok(this.0.lock().display_sources.clone())
291 }
292 }
293
294 pub fn publish_video_track(
295 self: &Arc<Self>,
296 track: &LocalVideoTrack,
297 ) -> impl Future<Output = Result<LocalTrackPublication>> {
298 let this = self.clone();
299 let track = track.clone();
300 async move {
301 this.test_server()
302 .publish_video_track(this.token(), track)
303 .await?;
304 Ok(LocalTrackPublication)
305 }
306 }
307
308 pub fn unpublish_track(&self, _: LocalTrackPublication) {}
309
310 pub fn remote_video_tracks(&self, _: &str) -> Vec<Arc<RemoteVideoTrack>> {
311 Default::default()
312 }
313
314 pub fn remote_video_track_updates(&self) -> impl Stream<Item = RemoteVideoTrackUpdate> {
315 self.0.lock().video_track_updates.1.clone()
316 }
317
318 pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
319 self.0.lock().display_sources = sources;
320 }
321
322 fn test_server(&self) -> Arc<TestServer> {
323 match self.0.lock().connection.1.borrow().clone() {
324 ConnectionState::Disconnected => panic!("must be connected to call this method"),
325 ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
326 }
327 }
328
329 fn token(&self) -> String {
330 match self.0.lock().connection.1.borrow().clone() {
331 ConnectionState::Disconnected => panic!("must be connected to call this method"),
332 ConnectionState::Connected { token, .. } => token,
333 }
334 }
335}
336
337impl Drop for Room {
338 fn drop(&mut self) {
339 if let ConnectionState::Connected { token, .. } = mem::replace(
340 &mut *self.0.lock().connection.0.borrow_mut(),
341 ConnectionState::Disconnected,
342 ) {
343 if let Ok(server) = TestServer::get(&token) {
344 let background = server.background.clone();
345 background
346 .spawn(async move { server.leave_room(token).await.unwrap() })
347 .detach();
348 }
349 }
350 }
351}
352
353pub struct LocalTrackPublication;
354
355#[derive(Clone)]
356pub struct LocalVideoTrack {
357 frames_rx: async_broadcast::Receiver<Frame>,
358}
359
360impl LocalVideoTrack {
361 pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
362 Self {
363 frames_rx: display.frames.1.clone(),
364 }
365 }
366}
367
368pub struct RemoteVideoTrack {
369 sid: Sid,
370 publisher_id: Sid,
371 frames_rx: async_broadcast::Receiver<Frame>,
372}
373
374impl RemoteVideoTrack {
375 pub fn sid(&self) -> &str {
376 &self.sid
377 }
378
379 pub fn publisher_id(&self) -> &str {
380 &self.publisher_id
381 }
382
383 pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
384 self.frames_rx.clone()
385 }
386}
387
388#[derive(Clone)]
389pub enum RemoteVideoTrackUpdate {
390 Subscribed(Arc<RemoteVideoTrack>),
391 Unsubscribed { publisher_id: Sid, track_id: Sid },
392}
393
394#[derive(Clone)]
395pub struct MacOSDisplay {
396 frames: (
397 async_broadcast::Sender<Frame>,
398 async_broadcast::Receiver<Frame>,
399 ),
400}
401
402impl MacOSDisplay {
403 pub fn new() -> Self {
404 Self {
405 frames: async_broadcast::broadcast(128),
406 }
407 }
408
409 pub fn send_frame(&self, frame: Frame) {
410 self.frames.0.try_broadcast(frame).unwrap();
411 }
412}
413
414#[derive(Clone, Debug, PartialEq, Eq)]
415pub struct Frame {
416 pub label: String,
417 pub width: usize,
418 pub height: usize,
419}
420
421impl Frame {
422 pub fn width(&self) -> usize {
423 self.width
424 }
425
426 pub fn height(&self) -> usize {
427 self.height
428 }
429
430 pub fn image(&self) -> CVImageBuffer {
431 unimplemented!("you can't call this in test mode")
432 }
433}