1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use collections::HashMap;
4use futures::{Stream, StreamExt};
5use gpui::executor::{self, Background};
6use lazy_static::lazy_static;
7use live_kit_server::token;
8use media::core_video::CVImageBuffer;
9use parking_lot::Mutex;
10use std::{future::Future, sync::Arc};
11
12lazy_static! {
13 static ref SERVERS: Mutex<HashMap<String, Arc<TestServer>>> = Default::default();
14}
15
16pub struct TestServer {
17 pub url: String,
18 pub api_key: String,
19 pub secret_key: String,
20 rooms: Mutex<HashMap<String, TestServerRoom>>,
21 background: Arc<Background>,
22}
23
24impl TestServer {
25 pub fn create(
26 url: String,
27 api_key: String,
28 secret_key: String,
29 background: Arc<Background>,
30 ) -> Result<Arc<TestServer>> {
31 let mut servers = SERVERS.lock();
32 if servers.contains_key(&url) {
33 Err(anyhow!("a server with url {:?} already exists", url))
34 } else {
35 let server = Arc::new(TestServer {
36 url: url.clone(),
37 api_key,
38 secret_key,
39 rooms: Default::default(),
40 background,
41 });
42 servers.insert(url, server.clone());
43 Ok(server)
44 }
45 }
46
47 fn get(url: &str) -> Result<Arc<TestServer>> {
48 Ok(SERVERS
49 .lock()
50 .get(url)
51 .ok_or_else(|| anyhow!("no server found for url"))?
52 .clone())
53 }
54
55 pub fn teardown(&self) -> Result<()> {
56 SERVERS
57 .lock()
58 .remove(&self.url)
59 .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
60 Ok(())
61 }
62
63 pub fn create_api_client(&self) -> TestApiClient {
64 TestApiClient {
65 url: self.url.clone(),
66 }
67 }
68
69 async fn create_room(&self, room: String) -> Result<()> {
70 self.background.simulate_random_delay().await;
71 let mut server_rooms = self.rooms.lock();
72 if server_rooms.contains_key(&room) {
73 Err(anyhow!("room {:?} already exists", room))
74 } else {
75 server_rooms.insert(room, Default::default());
76 Ok(())
77 }
78 }
79
80 async fn delete_room(&self, room: String) -> Result<()> {
81 // TODO: clear state associated with all `Room`s.
82 self.background.simulate_random_delay().await;
83 let mut server_rooms = self.rooms.lock();
84 server_rooms
85 .remove(&room)
86 .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
87 Ok(())
88 }
89
90 async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
91 self.background.simulate_random_delay().await;
92 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
93 let identity = claims.sub.unwrap().to_string();
94 let room_name = claims.video.room.unwrap();
95 let mut server_rooms = self.rooms.lock();
96 let room = server_rooms
97 .get_mut(&*room_name)
98 .ok_or_else(|| anyhow!("room {:?} does not exist", room_name))?;
99 if room.client_rooms.contains_key(&identity) {
100 Err(anyhow!(
101 "{:?} attempted to join room {:?} twice",
102 identity,
103 room_name
104 ))
105 } else {
106 room.client_rooms.insert(identity, client_room);
107 Ok(())
108 }
109 }
110
111 async fn leave_room(&self, token: String) -> Result<()> {
112 self.background.simulate_random_delay().await;
113 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
114 let identity = claims.sub.unwrap().to_string();
115 let room_name = claims.video.room.unwrap();
116 let mut server_rooms = self.rooms.lock();
117 let room = server_rooms
118 .get_mut(&*room_name)
119 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
120 room.client_rooms.remove(&identity).ok_or_else(|| {
121 anyhow!(
122 "{:?} attempted to leave room {:?} before joining it",
123 identity,
124 room_name
125 )
126 })?;
127 Ok(())
128 }
129
130 async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
131 // TODO: clear state associated with the `Room`.
132
133 self.background.simulate_random_delay().await;
134 let mut server_rooms = self.rooms.lock();
135 let room = server_rooms
136 .get_mut(&room_name)
137 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
138 room.client_rooms.remove(&identity).ok_or_else(|| {
139 anyhow!(
140 "participant {:?} did not join room {:?}",
141 identity,
142 room_name
143 )
144 })?;
145 Ok(())
146 }
147
148 async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> {
149 self.background.simulate_random_delay().await;
150 let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
151 let identity = claims.sub.unwrap().to_string();
152 let room_name = claims.video.room.unwrap();
153
154 let mut server_rooms = self.rooms.lock();
155 let room = server_rooms
156 .get_mut(&*room_name)
157 .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
158
159 let update = RemoteVideoTrackUpdate::Subscribed(Arc::new(RemoteVideoTrack {
160 sid: nanoid::nanoid!(17),
161 publisher_id: identity.clone(),
162 frames_rx: local_track.frames_rx.clone(),
163 background: self.background.clone(),
164 }));
165
166 for (id, client_room) in &room.client_rooms {
167 if *id != identity {
168 let _ = client_room
169 .0
170 .lock()
171 .video_track_updates
172 .0
173 .try_broadcast(update.clone())
174 .unwrap();
175 }
176 }
177
178 Ok(())
179 }
180}
181
182#[derive(Default)]
183struct TestServerRoom {
184 client_rooms: HashMap<Sid, Arc<Room>>,
185}
186
187impl TestServerRoom {}
188
189pub struct TestApiClient {
190 url: String,
191}
192
193#[async_trait]
194impl live_kit_server::api::Client for TestApiClient {
195 fn url(&self) -> &str {
196 &self.url
197 }
198
199 async fn create_room(&self, name: String) -> Result<()> {
200 let server = TestServer::get(&self.url)?;
201 server.create_room(name).await?;
202 Ok(())
203 }
204
205 async fn delete_room(&self, name: String) -> Result<()> {
206 let server = TestServer::get(&self.url)?;
207 server.delete_room(name).await?;
208 Ok(())
209 }
210
211 async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
212 let server = TestServer::get(&self.url)?;
213 server.remove_participant(room, identity).await?;
214 Ok(())
215 }
216
217 fn room_token(&self, room: &str, identity: &str) -> Result<String> {
218 let server = TestServer::get(&self.url)?;
219 token::create(
220 &server.api_key,
221 &server.secret_key,
222 Some(identity),
223 token::VideoGrant::to_join(room),
224 )
225 }
226}
227
228pub type Sid = String;
229
230struct RoomState {
231 connection: Option<ConnectionState>,
232 display_sources: Vec<MacOSDisplay>,
233 video_track_updates: (
234 async_broadcast::Sender<RemoteVideoTrackUpdate>,
235 async_broadcast::Receiver<RemoteVideoTrackUpdate>,
236 ),
237}
238
239struct ConnectionState {
240 url: String,
241 token: String,
242}
243
244pub struct Room(Mutex<RoomState>);
245
246impl Room {
247 pub fn new() -> Arc<Self> {
248 Arc::new(Self(Mutex::new(RoomState {
249 connection: None,
250 display_sources: Default::default(),
251 video_track_updates: async_broadcast::broadcast(128),
252 })))
253 }
254
255 pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
256 let this = self.clone();
257 let url = url.to_string();
258 let token = token.to_string();
259 async move {
260 let server = TestServer::get(&url)?;
261 server.join_room(token.clone(), this.clone()).await?;
262 this.0.lock().connection = Some(ConnectionState { url, token });
263 Ok(())
264 }
265 }
266
267 pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
268 let this = self.clone();
269 async move {
270 let server = this.test_server();
271 server.background.simulate_random_delay().await;
272 Ok(this.0.lock().display_sources.clone())
273 }
274 }
275
276 pub fn publish_video_track(
277 self: &Arc<Self>,
278 track: &LocalVideoTrack,
279 ) -> impl Future<Output = Result<LocalTrackPublication>> {
280 let this = self.clone();
281 let track = track.clone();
282 async move {
283 this.test_server()
284 .publish_video_track(this.token(), track)
285 .await?;
286 Ok(LocalTrackPublication)
287 }
288 }
289
290 pub fn unpublish_track(&self, _: LocalTrackPublication) {}
291
292 pub fn remote_video_tracks(&self, _: &str) -> Vec<Arc<RemoteVideoTrack>> {
293 Default::default()
294 }
295
296 pub fn remote_video_track_updates(&self) -> impl Stream<Item = RemoteVideoTrackUpdate> {
297 self.0.lock().video_track_updates.1.clone()
298 }
299
300 pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
301 self.0.lock().display_sources = sources;
302 }
303
304 fn test_server(&self) -> Arc<TestServer> {
305 let this = self.0.lock();
306 let connection = this
307 .connection
308 .as_ref()
309 .expect("must be connected to call this method");
310 TestServer::get(&connection.url).unwrap()
311 }
312
313 fn token(&self) -> String {
314 self.0
315 .lock()
316 .connection
317 .as_ref()
318 .expect("must be connected to call this method")
319 .token
320 .clone()
321 }
322}
323
324impl Drop for Room {
325 fn drop(&mut self) {
326 if let Some(connection) = self.0.lock().connection.take() {
327 if let Ok(server) = TestServer::get(&connection.token) {
328 let background = server.background.clone();
329 background
330 .spawn(async move { server.leave_room(connection.token).await.unwrap() })
331 .detach();
332 }
333 }
334 }
335}
336
337pub struct LocalTrackPublication;
338
339#[derive(Clone)]
340pub struct LocalVideoTrack {
341 frames_rx: async_broadcast::Receiver<Frame>,
342}
343
344impl LocalVideoTrack {
345 pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
346 Self {
347 frames_rx: display.frames.1.clone(),
348 }
349 }
350}
351
352pub struct RemoteVideoTrack {
353 sid: Sid,
354 publisher_id: Sid,
355 frames_rx: async_broadcast::Receiver<Frame>,
356 background: Arc<executor::Background>,
357}
358
359impl RemoteVideoTrack {
360 pub fn sid(&self) -> &str {
361 &self.sid
362 }
363
364 pub fn publisher_id(&self) -> &str {
365 &self.publisher_id
366 }
367
368 pub fn add_renderer<F>(&self, mut callback: F)
369 where
370 F: 'static + Send + Sync + FnMut(Frame),
371 {
372 let mut frames_rx = self.frames_rx.clone();
373 self.background
374 .spawn(async move {
375 while let Some(frame) = frames_rx.next().await {
376 callback(frame)
377 }
378 })
379 .detach();
380 }
381}
382
383#[derive(Clone)]
384pub enum RemoteVideoTrackUpdate {
385 Subscribed(Arc<RemoteVideoTrack>),
386 Unsubscribed { publisher_id: Sid, track_id: Sid },
387}
388
389#[derive(Clone)]
390pub struct MacOSDisplay {
391 frames: (
392 async_broadcast::Sender<Frame>,
393 async_broadcast::Receiver<Frame>,
394 ),
395}
396
397impl MacOSDisplay {
398 pub fn new() -> Self {
399 Self {
400 frames: async_broadcast::broadcast(128),
401 }
402 }
403
404 pub fn send_frame(&self, frame: Frame) {
405 self.frames.0.try_broadcast(frame).unwrap();
406 }
407}
408
409#[derive(Clone, Debug, PartialEq, Eq)]
410pub struct Frame {
411 pub label: String,
412 pub width: usize,
413 pub height: usize,
414}
415
416impl Frame {
417 pub fn width(&self) -> usize {
418 self.width
419 }
420
421 pub fn height(&self) -> usize {
422 self.height
423 }
424
425 pub fn image(&self) -> CVImageBuffer {
426 unimplemented!("you can't call this in test mode")
427 }
428}