1use crate::{
2 db::{NewUserParams, TestDb, UserId},
3 executor::Executor,
4 rpc::{Server, CLEANUP_TIMEOUT},
5 AppState,
6};
7use anyhow::anyhow;
8use call::ActiveCall;
9use client::{
10 self, proto::PeerId, Client, Connection, Credentials, EstablishConnectionError, UserStore,
11};
12use collections::{HashMap, HashSet};
13use fs::FakeFs;
14use futures::{channel::oneshot, StreamExt as _};
15use gpui::{executor::Deterministic, test::EmptyView, ModelHandle, TestAppContext, ViewHandle};
16use language::LanguageRegistry;
17use parking_lot::Mutex;
18use project::{Project, WorktreeId};
19use settings::Settings;
20use std::{
21 cell::{Ref, RefCell, RefMut},
22 env,
23 ops::{Deref, DerefMut},
24 path::Path,
25 sync::{
26 atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
27 Arc,
28 },
29};
30use theme::ThemeRegistry;
31use util::http::FakeHttpClient;
32use workspace::Workspace;
33
34mod integration_tests;
35mod randomized_integration_tests;
36
37struct TestServer {
38 app_state: Arc<AppState>,
39 server: Arc<Server>,
40 connection_killers: Arc<Mutex<HashMap<PeerId, Arc<AtomicBool>>>>,
41 forbid_connections: Arc<AtomicBool>,
42 _test_db: TestDb,
43 test_live_kit_server: Arc<live_kit_client::TestServer>,
44}
45
46impl TestServer {
47 async fn start(deterministic: &Arc<Deterministic>) -> Self {
48 static NEXT_LIVE_KIT_SERVER_ID: AtomicUsize = AtomicUsize::new(0);
49
50 let use_postgres = env::var("USE_POSTGRES").ok();
51 let use_postgres = use_postgres.as_deref();
52 let test_db = if use_postgres == Some("true") || use_postgres == Some("1") {
53 TestDb::postgres(deterministic.build_background())
54 } else {
55 TestDb::sqlite(deterministic.build_background())
56 };
57 let live_kit_server_id = NEXT_LIVE_KIT_SERVER_ID.fetch_add(1, SeqCst);
58 let live_kit_server = live_kit_client::TestServer::create(
59 format!("http://livekit.{}.test", live_kit_server_id),
60 format!("devkey-{}", live_kit_server_id),
61 format!("secret-{}", live_kit_server_id),
62 deterministic.build_background(),
63 )
64 .unwrap();
65 let app_state = Self::build_app_state(&test_db, &live_kit_server).await;
66 let epoch = app_state
67 .db
68 .create_server(&app_state.config.zed_environment)
69 .await
70 .unwrap();
71 let server = Server::new(
72 epoch,
73 app_state.clone(),
74 Executor::Deterministic(deterministic.build_background()),
75 );
76 server.start().await.unwrap();
77 // Advance clock to ensure the server's cleanup task is finished.
78 deterministic.advance_clock(CLEANUP_TIMEOUT);
79 Self {
80 app_state,
81 server,
82 connection_killers: Default::default(),
83 forbid_connections: Default::default(),
84 _test_db: test_db,
85 test_live_kit_server: live_kit_server,
86 }
87 }
88
89 async fn reset(&self) {
90 self.app_state.db.reset();
91 let epoch = self
92 .app_state
93 .db
94 .create_server(&self.app_state.config.zed_environment)
95 .await
96 .unwrap();
97 self.server.reset(epoch);
98 }
99
100 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
101 cx.update(|cx| {
102 cx.set_global(Settings::test(cx));
103 });
104
105 let http = FakeHttpClient::with_404_response();
106 let user_id = if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await
107 {
108 user.id
109 } else {
110 self.app_state
111 .db
112 .create_user(
113 &format!("{name}@example.com"),
114 false,
115 NewUserParams {
116 github_login: name.into(),
117 github_user_id: 0,
118 invite_count: 0,
119 },
120 )
121 .await
122 .expect("creating user failed")
123 .user_id
124 };
125 let client_name = name.to_string();
126 let mut client = cx.read(|cx| Client::new(http.clone(), cx));
127 let server = self.server.clone();
128 let db = self.app_state.db.clone();
129 let connection_killers = self.connection_killers.clone();
130 let forbid_connections = self.forbid_connections.clone();
131
132 Arc::get_mut(&mut client)
133 .unwrap()
134 .set_id(user_id.0 as usize)
135 .override_authenticate(move |cx| {
136 cx.spawn(|_| async move {
137 let access_token = "the-token".to_string();
138 Ok(Credentials {
139 user_id: user_id.0 as u64,
140 access_token,
141 })
142 })
143 })
144 .override_establish_connection(move |credentials, cx| {
145 assert_eq!(credentials.user_id, user_id.0 as u64);
146 assert_eq!(credentials.access_token, "the-token");
147
148 let server = server.clone();
149 let db = db.clone();
150 let connection_killers = connection_killers.clone();
151 let forbid_connections = forbid_connections.clone();
152 let client_name = client_name.clone();
153 cx.spawn(move |cx| async move {
154 if forbid_connections.load(SeqCst) {
155 Err(EstablishConnectionError::other(anyhow!(
156 "server is forbidding connections"
157 )))
158 } else {
159 let (client_conn, server_conn, killed) =
160 Connection::in_memory(cx.background());
161 let (connection_id_tx, connection_id_rx) = oneshot::channel();
162 let user = db
163 .get_user_by_id(user_id)
164 .await
165 .expect("retrieving user failed")
166 .unwrap();
167 cx.background()
168 .spawn(server.handle_connection(
169 server_conn,
170 client_name,
171 user,
172 Some(connection_id_tx),
173 Executor::Deterministic(cx.background()),
174 ))
175 .detach();
176 let connection_id = connection_id_rx.await.unwrap();
177 connection_killers
178 .lock()
179 .insert(connection_id.into(), killed);
180 Ok(client_conn)
181 }
182 })
183 });
184
185 let fs = FakeFs::new(cx.background());
186 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
187 let app_state = Arc::new(workspace::AppState {
188 client: client.clone(),
189 user_store: user_store.clone(),
190 languages: Arc::new(LanguageRegistry::test()),
191 themes: ThemeRegistry::new((), cx.font_cache()),
192 fs: fs.clone(),
193 build_window_options: |_, _, _| Default::default(),
194 initialize_workspace: |_, _, _| unimplemented!(),
195 dock_default_item_factory: |_, _| None,
196 background_actions: || &[],
197 });
198
199 Project::init(&client);
200 cx.update(|cx| {
201 workspace::init(app_state.clone(), cx);
202 call::init(client.clone(), user_store.clone(), cx);
203 });
204
205 client
206 .authenticate_and_connect(false, &cx.to_async())
207 .await
208 .unwrap();
209
210 let client = TestClient {
211 client,
212 username: name.to_string(),
213 state: Default::default(),
214 user_store,
215 fs,
216 language_registry: Arc::new(LanguageRegistry::test()),
217 };
218 client.wait_for_current_user(cx).await;
219 client
220 }
221
222 fn disconnect_client(&self, peer_id: PeerId) {
223 self.connection_killers
224 .lock()
225 .remove(&peer_id)
226 .unwrap()
227 .store(true, SeqCst);
228 }
229
230 fn forbid_connections(&self) {
231 self.forbid_connections.store(true, SeqCst);
232 }
233
234 fn allow_connections(&self) {
235 self.forbid_connections.store(false, SeqCst);
236 }
237
238 async fn make_contacts(&self, clients: &mut [(&TestClient, &mut TestAppContext)]) {
239 for ix in 1..clients.len() {
240 let (left, right) = clients.split_at_mut(ix);
241 let (client_a, cx_a) = left.last_mut().unwrap();
242 for (client_b, cx_b) in right {
243 client_a
244 .user_store
245 .update(*cx_a, |store, cx| {
246 store.request_contact(client_b.user_id().unwrap(), cx)
247 })
248 .await
249 .unwrap();
250 cx_a.foreground().run_until_parked();
251 client_b
252 .user_store
253 .update(*cx_b, |store, cx| {
254 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
255 })
256 .await
257 .unwrap();
258 }
259 }
260 }
261
262 async fn create_room(&self, clients: &mut [(&TestClient, &mut TestAppContext)]) {
263 self.make_contacts(clients).await;
264
265 let (left, right) = clients.split_at_mut(1);
266 let (_client_a, cx_a) = &mut left[0];
267 let active_call_a = cx_a.read(ActiveCall::global);
268
269 for (client_b, cx_b) in right {
270 let user_id_b = client_b.current_user_id(*cx_b).to_proto();
271 active_call_a
272 .update(*cx_a, |call, cx| call.invite(user_id_b, None, cx))
273 .await
274 .unwrap();
275
276 cx_b.foreground().run_until_parked();
277 let active_call_b = cx_b.read(ActiveCall::global);
278 active_call_b
279 .update(*cx_b, |call, cx| call.accept_incoming(cx))
280 .await
281 .unwrap();
282 }
283 }
284
285 async fn build_app_state(
286 test_db: &TestDb,
287 fake_server: &live_kit_client::TestServer,
288 ) -> Arc<AppState> {
289 Arc::new(AppState {
290 db: test_db.db().clone(),
291 live_kit_client: Some(Arc::new(fake_server.create_api_client())),
292 config: Default::default(),
293 })
294 }
295}
296
297impl Deref for TestServer {
298 type Target = Server;
299
300 fn deref(&self) -> &Self::Target {
301 &self.server
302 }
303}
304
305impl Drop for TestServer {
306 fn drop(&mut self) {
307 self.server.teardown();
308 self.test_live_kit_server.teardown().unwrap();
309 }
310}
311
312struct TestClient {
313 client: Arc<Client>,
314 username: String,
315 state: RefCell<TestClientState>,
316 pub user_store: ModelHandle<UserStore>,
317 language_registry: Arc<LanguageRegistry>,
318 fs: Arc<FakeFs>,
319}
320
321#[derive(Default)]
322struct TestClientState {
323 local_projects: Vec<ModelHandle<Project>>,
324 remote_projects: Vec<ModelHandle<Project>>,
325 buffers: HashMap<ModelHandle<Project>, HashSet<ModelHandle<language::Buffer>>>,
326}
327
328impl Deref for TestClient {
329 type Target = Arc<Client>;
330
331 fn deref(&self) -> &Self::Target {
332 &self.client
333 }
334}
335
336struct ContactsSummary {
337 pub current: Vec<String>,
338 pub outgoing_requests: Vec<String>,
339 pub incoming_requests: Vec<String>,
340}
341
342impl TestClient {
343 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
344 UserId::from_proto(
345 self.user_store
346 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
347 )
348 }
349
350 async fn wait_for_current_user(&self, cx: &TestAppContext) {
351 let mut authed_user = self
352 .user_store
353 .read_with(cx, |user_store, _| user_store.watch_current_user());
354 while authed_user.next().await.unwrap().is_none() {}
355 }
356
357 async fn clear_contacts(&self, cx: &mut TestAppContext) {
358 self.user_store
359 .update(cx, |store, _| store.clear_contacts())
360 .await;
361 }
362
363 fn local_projects<'a>(&'a self) -> impl Deref<Target = Vec<ModelHandle<Project>>> + 'a {
364 Ref::map(self.state.borrow(), |state| &state.local_projects)
365 }
366
367 fn remote_projects<'a>(&'a self) -> impl Deref<Target = Vec<ModelHandle<Project>>> + 'a {
368 Ref::map(self.state.borrow(), |state| &state.remote_projects)
369 }
370
371 fn local_projects_mut<'a>(&'a self) -> impl DerefMut<Target = Vec<ModelHandle<Project>>> + 'a {
372 RefMut::map(self.state.borrow_mut(), |state| &mut state.local_projects)
373 }
374
375 fn remote_projects_mut<'a>(&'a self) -> impl DerefMut<Target = Vec<ModelHandle<Project>>> + 'a {
376 RefMut::map(self.state.borrow_mut(), |state| &mut state.remote_projects)
377 }
378
379 fn buffers_for_project<'a>(
380 &'a self,
381 project: &ModelHandle<Project>,
382 ) -> impl DerefMut<Target = HashSet<ModelHandle<language::Buffer>>> + 'a {
383 RefMut::map(self.state.borrow_mut(), |state| {
384 state.buffers.entry(project.clone()).or_default()
385 })
386 }
387
388 fn buffers<'a>(
389 &'a self,
390 ) -> impl DerefMut<Target = HashMap<ModelHandle<Project>, HashSet<ModelHandle<language::Buffer>>>> + 'a
391 {
392 RefMut::map(self.state.borrow_mut(), |state| &mut state.buffers)
393 }
394
395 fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
396 self.user_store.read_with(cx, |store, _| ContactsSummary {
397 current: store
398 .contacts()
399 .iter()
400 .map(|contact| contact.user.github_login.clone())
401 .collect(),
402 outgoing_requests: store
403 .outgoing_contact_requests()
404 .iter()
405 .map(|user| user.github_login.clone())
406 .collect(),
407 incoming_requests: store
408 .incoming_contact_requests()
409 .iter()
410 .map(|user| user.github_login.clone())
411 .collect(),
412 })
413 }
414
415 async fn build_local_project(
416 &self,
417 root_path: impl AsRef<Path>,
418 cx: &mut TestAppContext,
419 ) -> (ModelHandle<Project>, WorktreeId) {
420 let project = cx.update(|cx| {
421 Project::local(
422 self.client.clone(),
423 self.user_store.clone(),
424 self.language_registry.clone(),
425 self.fs.clone(),
426 cx,
427 )
428 });
429 let (worktree, _) = project
430 .update(cx, |p, cx| {
431 p.find_or_create_local_worktree(root_path, true, cx)
432 })
433 .await
434 .unwrap();
435 worktree
436 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
437 .await;
438 (project, worktree.read_with(cx, |tree, _| tree.id()))
439 }
440
441 async fn build_remote_project(
442 &self,
443 host_project_id: u64,
444 guest_cx: &mut TestAppContext,
445 ) -> ModelHandle<Project> {
446 let active_call = guest_cx.read(ActiveCall::global);
447 let room = active_call.read_with(guest_cx, |call, _| call.room().unwrap().clone());
448 room.update(guest_cx, |room, cx| {
449 room.join_project(
450 host_project_id,
451 self.language_registry.clone(),
452 self.fs.clone(),
453 cx,
454 )
455 })
456 .await
457 .unwrap()
458 }
459
460 fn build_workspace(
461 &self,
462 project: &ModelHandle<Project>,
463 cx: &mut TestAppContext,
464 ) -> ViewHandle<Workspace> {
465 let (_, root_view) = cx.add_window(|_| EmptyView);
466 cx.add_view(&root_view, |cx| Workspace::test_new(project.clone(), cx))
467 }
468}
469
470impl Drop for TestClient {
471 fn drop(&mut self) {
472 self.client.teardown();
473 }
474}