1use crate::{
2 db::{NewUserParams, TestDb, UserId},
3 executor::Executor,
4 rpc::{Server, CLEANUP_TIMEOUT},
5 AppState,
6};
7use anyhow::anyhow;
8use call::ActiveCall;
9use client::{
10 self, proto::PeerId, test::FakeHttpClient, Client, Connection, Credentials,
11 EstablishConnectionError, UserStore,
12};
13use collections::{HashMap, HashSet};
14use fs::{FakeFs, HomeDir};
15use futures::{channel::oneshot, StreamExt as _};
16use gpui::{
17 executor::Deterministic, test::EmptyView, ModelHandle, Task, TestAppContext, ViewHandle,
18};
19use language::LanguageRegistry;
20use parking_lot::Mutex;
21use project::{Project, WorktreeId};
22use settings::Settings;
23use std::{
24 cell::{Ref, RefCell, RefMut},
25 env,
26 ops::{Deref, DerefMut},
27 path::Path,
28 sync::{
29 atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
30 Arc,
31 },
32};
33use theme::ThemeRegistry;
34use workspace::Workspace;
35
36mod integration_tests;
37mod randomized_integration_tests;
38
39struct TestServer {
40 app_state: Arc<AppState>,
41 server: Arc<Server>,
42 connection_killers: Arc<Mutex<HashMap<PeerId, Arc<AtomicBool>>>>,
43 forbid_connections: Arc<AtomicBool>,
44 _test_db: TestDb,
45 test_live_kit_server: Arc<live_kit_client::TestServer>,
46}
47
48impl TestServer {
49 async fn start(deterministic: &Arc<Deterministic>) -> Self {
50 static NEXT_LIVE_KIT_SERVER_ID: AtomicUsize = AtomicUsize::new(0);
51
52 let use_postgres = env::var("USE_POSTGRES").ok();
53 let use_postgres = use_postgres.as_deref();
54 let test_db = if use_postgres == Some("true") || use_postgres == Some("1") {
55 TestDb::postgres(deterministic.build_background())
56 } else {
57 TestDb::sqlite(deterministic.build_background())
58 };
59 let live_kit_server_id = NEXT_LIVE_KIT_SERVER_ID.fetch_add(1, SeqCst);
60 let live_kit_server = live_kit_client::TestServer::create(
61 format!("http://livekit.{}.test", live_kit_server_id),
62 format!("devkey-{}", live_kit_server_id),
63 format!("secret-{}", live_kit_server_id),
64 deterministic.build_background(),
65 )
66 .unwrap();
67 let app_state = Self::build_app_state(&test_db, &live_kit_server).await;
68 let epoch = app_state
69 .db
70 .create_server(&app_state.config.zed_environment)
71 .await
72 .unwrap();
73 let server = Server::new(
74 epoch,
75 app_state.clone(),
76 Executor::Deterministic(deterministic.build_background()),
77 );
78 server.start().await.unwrap();
79 // Advance clock to ensure the server's cleanup task is finished.
80 deterministic.advance_clock(CLEANUP_TIMEOUT);
81 Self {
82 app_state,
83 server,
84 connection_killers: Default::default(),
85 forbid_connections: Default::default(),
86 _test_db: test_db,
87 test_live_kit_server: live_kit_server,
88 }
89 }
90
91 async fn reset(&self) {
92 self.app_state.db.reset();
93 let epoch = self
94 .app_state
95 .db
96 .create_server(&self.app_state.config.zed_environment)
97 .await
98 .unwrap();
99 self.server.reset(epoch);
100 }
101
102 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
103 cx.update(|cx| {
104 cx.set_global(HomeDir(Path::new("/tmp/").to_path_buf()));
105
106 let mut settings = Settings::test(cx);
107 settings.projects_online_by_default = false;
108 cx.set_global(settings);
109 });
110
111 let http = FakeHttpClient::with_404_response();
112 let user_id = if let Ok(Some(user)) = self
113 .app_state
114 .db
115 .get_user_by_github_account(name, None)
116 .await
117 {
118 user.id
119 } else {
120 self.app_state
121 .db
122 .create_user(
123 &format!("{name}@example.com"),
124 false,
125 NewUserParams {
126 github_login: name.into(),
127 github_user_id: 0,
128 invite_count: 0,
129 },
130 )
131 .await
132 .expect("creating user failed")
133 .user_id
134 };
135 let client_name = name.to_string();
136 let mut client = cx.read(|cx| Client::new(http.clone(), cx));
137 let server = self.server.clone();
138 let db = self.app_state.db.clone();
139 let connection_killers = self.connection_killers.clone();
140 let forbid_connections = self.forbid_connections.clone();
141
142 Arc::get_mut(&mut client)
143 .unwrap()
144 .set_id(user_id.0 as usize)
145 .override_authenticate(move |cx| {
146 cx.spawn(|_| async move {
147 let access_token = "the-token".to_string();
148 Ok(Credentials {
149 user_id: user_id.0 as u64,
150 access_token,
151 })
152 })
153 })
154 .override_establish_connection(move |credentials, cx| {
155 assert_eq!(credentials.user_id, user_id.0 as u64);
156 assert_eq!(credentials.access_token, "the-token");
157
158 let server = server.clone();
159 let db = db.clone();
160 let connection_killers = connection_killers.clone();
161 let forbid_connections = forbid_connections.clone();
162 let client_name = client_name.clone();
163 cx.spawn(move |cx| async move {
164 if forbid_connections.load(SeqCst) {
165 Err(EstablishConnectionError::other(anyhow!(
166 "server is forbidding connections"
167 )))
168 } else {
169 let (client_conn, server_conn, killed) =
170 Connection::in_memory(cx.background());
171 let (connection_id_tx, connection_id_rx) = oneshot::channel();
172 let user = db
173 .get_user_by_id(user_id)
174 .await
175 .expect("retrieving user failed")
176 .unwrap();
177 cx.background()
178 .spawn(server.handle_connection(
179 server_conn,
180 client_name,
181 user,
182 Some(connection_id_tx),
183 Executor::Deterministic(cx.background()),
184 ))
185 .detach();
186 let connection_id = connection_id_rx.await.unwrap();
187 connection_killers
188 .lock()
189 .insert(connection_id.into(), killed);
190 Ok(client_conn)
191 }
192 })
193 });
194
195 let fs = FakeFs::new(cx.background());
196 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
197 let app_state = Arc::new(workspace::AppState {
198 client: client.clone(),
199 user_store: user_store.clone(),
200 languages: Arc::new(LanguageRegistry::new(Task::ready(()))),
201 themes: ThemeRegistry::new((), cx.font_cache()),
202 fs: fs.clone(),
203 build_window_options: Default::default,
204 initialize_workspace: |_, _, _| unimplemented!(),
205 dock_default_item_factory: |_, _| unimplemented!(),
206 });
207
208 Project::init(&client);
209 cx.update(|cx| {
210 workspace::init(app_state.clone(), cx);
211 call::init(client.clone(), user_store.clone(), cx);
212 });
213
214 client
215 .authenticate_and_connect(false, &cx.to_async())
216 .await
217 .unwrap();
218
219 let client = TestClient {
220 client,
221 username: name.to_string(),
222 state: Default::default(),
223 user_store,
224 fs,
225 language_registry: Arc::new(LanguageRegistry::test()),
226 };
227 client.wait_for_current_user(cx).await;
228 client
229 }
230
231 fn disconnect_client(&self, peer_id: PeerId) {
232 self.connection_killers
233 .lock()
234 .remove(&peer_id)
235 .unwrap()
236 .store(true, SeqCst);
237 }
238
239 fn forbid_connections(&self) {
240 self.forbid_connections.store(true, SeqCst);
241 }
242
243 fn allow_connections(&self) {
244 self.forbid_connections.store(false, SeqCst);
245 }
246
247 async fn make_contacts(&self, clients: &mut [(&TestClient, &mut TestAppContext)]) {
248 for ix in 1..clients.len() {
249 let (left, right) = clients.split_at_mut(ix);
250 let (client_a, cx_a) = left.last_mut().unwrap();
251 for (client_b, cx_b) in right {
252 client_a
253 .user_store
254 .update(*cx_a, |store, cx| {
255 store.request_contact(client_b.user_id().unwrap(), cx)
256 })
257 .await
258 .unwrap();
259 cx_a.foreground().run_until_parked();
260 client_b
261 .user_store
262 .update(*cx_b, |store, cx| {
263 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
264 })
265 .await
266 .unwrap();
267 }
268 }
269 }
270
271 async fn create_room(&self, clients: &mut [(&TestClient, &mut TestAppContext)]) {
272 self.make_contacts(clients).await;
273
274 let (left, right) = clients.split_at_mut(1);
275 let (_client_a, cx_a) = &mut left[0];
276 let active_call_a = cx_a.read(ActiveCall::global);
277
278 for (client_b, cx_b) in right {
279 let user_id_b = client_b.current_user_id(*cx_b).to_proto();
280 active_call_a
281 .update(*cx_a, |call, cx| call.invite(user_id_b, None, cx))
282 .await
283 .unwrap();
284
285 cx_b.foreground().run_until_parked();
286 let active_call_b = cx_b.read(ActiveCall::global);
287 active_call_b
288 .update(*cx_b, |call, cx| call.accept_incoming(cx))
289 .await
290 .unwrap();
291 }
292 }
293
294 async fn build_app_state(
295 test_db: &TestDb,
296 fake_server: &live_kit_client::TestServer,
297 ) -> Arc<AppState> {
298 Arc::new(AppState {
299 db: test_db.db().clone(),
300 live_kit_client: Some(Arc::new(fake_server.create_api_client())),
301 config: Default::default(),
302 })
303 }
304}
305
306impl Deref for TestServer {
307 type Target = Server;
308
309 fn deref(&self) -> &Self::Target {
310 &self.server
311 }
312}
313
314impl Drop for TestServer {
315 fn drop(&mut self) {
316 self.server.teardown();
317 self.test_live_kit_server.teardown().unwrap();
318 }
319}
320
321struct TestClient {
322 client: Arc<Client>,
323 username: String,
324 state: RefCell<TestClientState>,
325 pub user_store: ModelHandle<UserStore>,
326 language_registry: Arc<LanguageRegistry>,
327 fs: Arc<FakeFs>,
328}
329
330#[derive(Default)]
331struct TestClientState {
332 local_projects: Vec<ModelHandle<Project>>,
333 remote_projects: Vec<ModelHandle<Project>>,
334 buffers: HashMap<ModelHandle<Project>, HashSet<ModelHandle<language::Buffer>>>,
335}
336
337impl Deref for TestClient {
338 type Target = Arc<Client>;
339
340 fn deref(&self) -> &Self::Target {
341 &self.client
342 }
343}
344
345struct ContactsSummary {
346 pub current: Vec<String>,
347 pub outgoing_requests: Vec<String>,
348 pub incoming_requests: Vec<String>,
349}
350
351impl TestClient {
352 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
353 UserId::from_proto(
354 self.user_store
355 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
356 )
357 }
358
359 async fn wait_for_current_user(&self, cx: &TestAppContext) {
360 let mut authed_user = self
361 .user_store
362 .read_with(cx, |user_store, _| user_store.watch_current_user());
363 while authed_user.next().await.unwrap().is_none() {}
364 }
365
366 async fn clear_contacts(&self, cx: &mut TestAppContext) {
367 self.user_store
368 .update(cx, |store, _| store.clear_contacts())
369 .await;
370 }
371
372 fn local_projects<'a>(&'a self) -> impl Deref<Target = Vec<ModelHandle<Project>>> + 'a {
373 Ref::map(self.state.borrow(), |state| &state.local_projects)
374 }
375
376 fn remote_projects<'a>(&'a self) -> impl Deref<Target = Vec<ModelHandle<Project>>> + 'a {
377 Ref::map(self.state.borrow(), |state| &state.remote_projects)
378 }
379
380 fn local_projects_mut<'a>(&'a self) -> impl DerefMut<Target = Vec<ModelHandle<Project>>> + 'a {
381 RefMut::map(self.state.borrow_mut(), |state| &mut state.local_projects)
382 }
383
384 fn remote_projects_mut<'a>(&'a self) -> impl DerefMut<Target = Vec<ModelHandle<Project>>> + 'a {
385 RefMut::map(self.state.borrow_mut(), |state| &mut state.remote_projects)
386 }
387
388 fn buffers_for_project<'a>(
389 &'a self,
390 project: &ModelHandle<Project>,
391 ) -> impl DerefMut<Target = HashSet<ModelHandle<language::Buffer>>> + 'a {
392 RefMut::map(self.state.borrow_mut(), |state| {
393 state.buffers.entry(project.clone()).or_default()
394 })
395 }
396
397 fn buffers<'a>(
398 &'a self,
399 ) -> impl DerefMut<Target = HashMap<ModelHandle<Project>, HashSet<ModelHandle<language::Buffer>>>> + 'a
400 {
401 RefMut::map(self.state.borrow_mut(), |state| &mut state.buffers)
402 }
403
404 fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
405 self.user_store.read_with(cx, |store, _| ContactsSummary {
406 current: store
407 .contacts()
408 .iter()
409 .map(|contact| contact.user.github_login.clone())
410 .collect(),
411 outgoing_requests: store
412 .outgoing_contact_requests()
413 .iter()
414 .map(|user| user.github_login.clone())
415 .collect(),
416 incoming_requests: store
417 .incoming_contact_requests()
418 .iter()
419 .map(|user| user.github_login.clone())
420 .collect(),
421 })
422 }
423
424 async fn build_local_project(
425 &self,
426 root_path: impl AsRef<Path>,
427 cx: &mut TestAppContext,
428 ) -> (ModelHandle<Project>, WorktreeId) {
429 let project = cx.update(|cx| {
430 Project::local(
431 self.client.clone(),
432 self.user_store.clone(),
433 self.language_registry.clone(),
434 self.fs.clone(),
435 cx,
436 )
437 });
438 let (worktree, _) = project
439 .update(cx, |p, cx| {
440 p.find_or_create_local_worktree(root_path, true, cx)
441 })
442 .await
443 .unwrap();
444 worktree
445 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
446 .await;
447 (project, worktree.read_with(cx, |tree, _| tree.id()))
448 }
449
450 async fn build_remote_project(
451 &self,
452 host_project_id: u64,
453 guest_cx: &mut TestAppContext,
454 ) -> ModelHandle<Project> {
455 let active_call = guest_cx.read(ActiveCall::global);
456 let room = active_call.read_with(guest_cx, |call, _| call.room().unwrap().clone());
457 room.update(guest_cx, |room, cx| {
458 room.join_project(
459 host_project_id,
460 self.language_registry.clone(),
461 self.fs.clone(),
462 cx,
463 )
464 })
465 .await
466 .unwrap()
467 }
468
469 fn build_workspace(
470 &self,
471 project: &ModelHandle<Project>,
472 cx: &mut TestAppContext,
473 ) -> ViewHandle<Workspace> {
474 let (_, root_view) = cx.add_window(|_| EmptyView);
475 cx.add_view(&root_view, |cx| {
476 Workspace::new(
477 Default::default(),
478 0,
479 project.clone(),
480 |_, _| unimplemented!(),
481 cx,
482 )
483 })
484 }
485}
486
487impl Drop for TestClient {
488 fn drop(&mut self) {
489 self.client.teardown();
490 }
491}