1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use collections::HashMap;
4use futures::Stream;
5use gpui::executor::Background;
6use lazy_static::lazy_static;
7use live_kit_server::token;
8use media::core_video::CVImageBuffer;
9use parking_lot::Mutex;
10use std::{future::Future, sync::Arc};
11
12lazy_static! {
13 static ref SERVERS: Mutex<HashMap<String, Arc<TestServer>>> = Default::default();
14}
15
16pub struct TestServer {
17 pub url: String,
18 pub api_key: String,
19 pub secret_key: String,
20 rooms: Mutex<HashMap<String, TestServerRoom>>,
21 background: Arc<Background>,
22}
23
24impl TestServer {
25 pub fn create(
26 url: String,
27 api_key: String,
28 secret_key: String,
29 background: Arc<Background>,
30 ) -> Result<Arc<TestServer>> {
31 let mut servers = SERVERS.lock();
32 if servers.contains_key(&url) {
33 Err(anyhow!("a server with url {:?} already exists", url))
34 } else {
35 let server = Arc::new(TestServer {
36 url: url.clone(),
37 api_key,
38 secret_key,
39 rooms: Default::default(),
40 background,
41 });
42 servers.insert(url, server.clone());
43 Ok(server)
44 }
45 }
46
47 fn get(url: &str) -> Result<Arc<TestServer>> {
48 Ok(SERVERS
49 .lock()
50 .get(url)
51 .ok_or_else(|| anyhow!("no server found for url"))?
52 .clone())
53 }
54
55 pub fn teardown(&self) -> Result<()> {
56 SERVERS
57 .lock()
58 .remove(&self.url)
59 .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
60 Ok(())
61 }
62
63 pub fn create_api_client(&self) -> TestApiClient {
64 TestApiClient {
65 url: self.url.clone(),
66 }
67 }
68
69 async fn create_room(&self, room: String) -> Result<()> {
70 self.background.simulate_random_delay().await;
71 let mut server_rooms = self.rooms.lock();
72 if server_rooms.contains_key(&room) {
73 Err(anyhow!("room {:?} already exists", room))
74 } else {
75 server_rooms.insert(room, Default::default());
76 Ok(())
77 }
78 }
79
80 async fn delete_room(&self, room: String) -> Result<()> {
81 // TODO: clear state associated with all `Room`s.
82 self.background.simulate_random_delay().await;
83 let mut server_rooms = self.rooms.lock();
84 server_rooms
85 .remove(&room)
86 .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
87 Ok(())
88 }
89
90 async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
91 self.background.simulate_random_delay().await;
92 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
93 let identity = claims.sub.unwrap().to_string();
94 let room_name = claims.video.room.unwrap();
95 let mut server_rooms = self.rooms.lock();
96 let room = server_rooms
97 .get_mut(&*room_name)
98 .ok_or_else(|| anyhow!("room {:?} does not exist", room_name))?;
99 if room.client_rooms.contains_key(&identity) {
100 Err(anyhow!(
101 "{:?} attempted to join room {:?} twice",
102 identity,
103 room_name
104 ))
105 } else {
106 room.client_rooms.insert(identity, client_room);
107 Ok(())
108 }
109 }
110
111 async fn leave_room(&self, token: String) -> Result<()> {
112 self.background.simulate_random_delay().await;
113 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
114 let identity = claims.sub.unwrap().to_string();
115 let room_name = claims.video.room.unwrap();
116 let mut server_rooms = self.rooms.lock();
117 let room = server_rooms
118 .get_mut(&*room_name)
119 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
120 room.client_rooms.remove(&identity).ok_or_else(|| {
121 anyhow!(
122 "{:?} attempted to leave room {:?} before joining it",
123 identity,
124 room_name
125 )
126 })?;
127 Ok(())
128 }
129
130 async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
131 // TODO: clear state associated with the `Room`.
132
133 self.background.simulate_random_delay().await;
134 let mut server_rooms = self.rooms.lock();
135 let room = server_rooms
136 .get_mut(&room_name)
137 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
138 room.client_rooms.remove(&identity).ok_or_else(|| {
139 anyhow!(
140 "participant {:?} did not join room {:?}",
141 identity,
142 room_name
143 )
144 })?;
145 Ok(())
146 }
147
148 async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> {
149 self.background.simulate_random_delay().await;
150 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
151 let identity = claims.sub.unwrap().to_string();
152 let room_name = claims.video.room.unwrap();
153
154 let mut server_rooms = self.rooms.lock();
155 let room = server_rooms
156 .get_mut(&*room_name)
157 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
158
159 let update = RemoteVideoTrackUpdate::Subscribed(Arc::new(RemoteVideoTrack {
160 sid: nanoid::nanoid!(17),
161 publisher_id: identity.clone(),
162 frames_rx: local_track.frames_rx.clone(),
163 }));
164
165 for (id, client_room) in &room.client_rooms {
166 if *id != identity {
167 let _ = client_room
168 .0
169 .lock()
170 .video_track_updates
171 .0
172 .try_broadcast(update.clone())
173 .unwrap();
174 }
175 }
176
177 Ok(())
178 }
179}
180
181#[derive(Default)]
182struct TestServerRoom {
183 client_rooms: HashMap<Sid, Arc<Room>>,
184}
185
186impl TestServerRoom {}
187
188pub struct TestApiClient {
189 url: String,
190}
191
192#[async_trait]
193impl live_kit_server::api::Client for TestApiClient {
194 fn url(&self) -> &str {
195 &self.url
196 }
197
198 async fn create_room(&self, name: String) -> Result<()> {
199 let server = TestServer::get(&self.url)?;
200 server.create_room(name).await?;
201 Ok(())
202 }
203
204 async fn delete_room(&self, name: String) -> Result<()> {
205 let server = TestServer::get(&self.url)?;
206 server.delete_room(name).await?;
207 Ok(())
208 }
209
210 async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
211 let server = TestServer::get(&self.url)?;
212 server.remove_participant(room, identity).await?;
213 Ok(())
214 }
215
216 fn room_token(&self, room: &str, identity: &str) -> Result<String> {
217 let server = TestServer::get(&self.url)?;
218 token::create(
219 &server.api_key,
220 &server.secret_key,
221 Some(identity),
222 token::VideoGrant::to_join(room),
223 )
224 }
225}
226
227pub type Sid = String;
228
229struct RoomState {
230 connection: Option<ConnectionState>,
231 display_sources: Vec<MacOSDisplay>,
232 video_track_updates: (
233 async_broadcast::Sender<RemoteVideoTrackUpdate>,
234 async_broadcast::Receiver<RemoteVideoTrackUpdate>,
235 ),
236}
237
238struct ConnectionState {
239 url: String,
240 token: String,
241}
242
243pub struct Room(Mutex<RoomState>);
244
245impl Room {
246 pub fn new() -> Arc<Self> {
247 Arc::new(Self(Mutex::new(RoomState {
248 connection: None,
249 display_sources: Default::default(),
250 video_track_updates: async_broadcast::broadcast(128),
251 })))
252 }
253
254 pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
255 let this = self.clone();
256 let url = url.to_string();
257 let token = token.to_string();
258 async move {
259 let server = TestServer::get(&url)?;
260 server.join_room(token.clone(), this.clone()).await?;
261 this.0.lock().connection = Some(ConnectionState { url, token });
262 Ok(())
263 }
264 }
265
266 pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
267 let this = self.clone();
268 async move {
269 let server = this.test_server();
270 server.background.simulate_random_delay().await;
271 Ok(this.0.lock().display_sources.clone())
272 }
273 }
274
275 pub fn publish_video_track(
276 self: &Arc<Self>,
277 track: &LocalVideoTrack,
278 ) -> impl Future<Output = Result<LocalTrackPublication>> {
279 let this = self.clone();
280 let track = track.clone();
281 async move {
282 this.test_server()
283 .publish_video_track(this.token(), track)
284 .await?;
285 Ok(LocalTrackPublication)
286 }
287 }
288
289 pub fn unpublish_track(&self, _: LocalTrackPublication) {}
290
291 pub fn remote_video_tracks(&self, _: &str) -> Vec<Arc<RemoteVideoTrack>> {
292 Default::default()
293 }
294
295 pub fn remote_video_track_updates(&self) -> impl Stream<Item = RemoteVideoTrackUpdate> {
296 self.0.lock().video_track_updates.1.clone()
297 }
298
299 pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
300 self.0.lock().display_sources = sources;
301 }
302
303 fn test_server(&self) -> Arc<TestServer> {
304 let this = self.0.lock();
305 let connection = this
306 .connection
307 .as_ref()
308 .expect("must be connected to call this method");
309 TestServer::get(&connection.url).unwrap()
310 }
311
312 fn token(&self) -> String {
313 self.0
314 .lock()
315 .connection
316 .as_ref()
317 .expect("must be connected to call this method")
318 .token
319 .clone()
320 }
321}
322
323impl Drop for Room {
324 fn drop(&mut self) {
325 if let Some(connection) = self.0.lock().connection.take() {
326 if let Ok(server) = TestServer::get(&connection.token) {
327 let background = server.background.clone();
328 background
329 .spawn(async move { server.leave_room(connection.token).await.unwrap() })
330 .detach();
331 }
332 }
333 }
334}
335
336pub struct LocalTrackPublication;
337
338#[derive(Clone)]
339pub struct LocalVideoTrack {
340 frames_rx: async_broadcast::Receiver<Frame>,
341}
342
343impl LocalVideoTrack {
344 pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
345 Self {
346 frames_rx: display.frames.1.clone(),
347 }
348 }
349}
350
351pub struct RemoteVideoTrack {
352 sid: Sid,
353 publisher_id: Sid,
354 frames_rx: async_broadcast::Receiver<Frame>,
355}
356
357impl RemoteVideoTrack {
358 pub fn sid(&self) -> &str {
359 &self.sid
360 }
361
362 pub fn publisher_id(&self) -> &str {
363 &self.publisher_id
364 }
365
366 pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
367 self.frames_rx.clone()
368 }
369}
370
371#[derive(Clone)]
372pub enum RemoteVideoTrackUpdate {
373 Subscribed(Arc<RemoteVideoTrack>),
374 Unsubscribed { publisher_id: Sid, track_id: Sid },
375}
376
377#[derive(Clone)]
378pub struct MacOSDisplay {
379 frames: (
380 async_broadcast::Sender<Frame>,
381 async_broadcast::Receiver<Frame>,
382 ),
383}
384
385impl MacOSDisplay {
386 pub fn new() -> Self {
387 Self {
388 frames: async_broadcast::broadcast(128),
389 }
390 }
391
392 pub fn send_frame(&self, frame: Frame) {
393 self.frames.0.try_broadcast(frame).unwrap();
394 }
395}
396
397#[derive(Clone, Debug, PartialEq, Eq)]
398pub struct Frame {
399 pub label: String,
400 pub width: usize,
401 pub height: usize,
402}
403
404impl Frame {
405 pub fn width(&self) -> usize {
406 self.width
407 }
408
409 pub fn height(&self) -> usize {
410 self.height
411 }
412
413 pub fn image(&self) -> CVImageBuffer {
414 unimplemented!("you can't call this in test mode")
415 }
416}