1use crate::{
2 db::{NewUserParams, TestDb, UserId},
3 executor::Executor,
4 rpc::{Server, CLEANUP_TIMEOUT},
5 AppState,
6};
7use anyhow::anyhow;
8use call::ActiveCall;
9use client::{
10 self, proto::PeerId, test::FakeHttpClient, Client, Connection, Credentials,
11 EstablishConnectionError, UserStore,
12};
13use collections::{HashMap, HashSet};
14use fs::{FakeFs, HomeDir};
15use futures::{channel::oneshot, StreamExt as _};
16use gpui::{
17 executor::Deterministic, test::EmptyView, ModelHandle, Task, TestAppContext, ViewHandle,
18};
19use language::LanguageRegistry;
20use parking_lot::Mutex;
21use project::{Project, WorktreeId};
22use settings::Settings;
23use std::{
24 env,
25 ops::Deref,
26 path::{Path, PathBuf},
27 sync::{
28 atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
29 Arc,
30 },
31};
32use theme::ThemeRegistry;
33use workspace::Workspace;
34
35mod integration_tests;
36mod randomized_integration_tests;
37
38struct TestServer {
39 app_state: Arc<AppState>,
40 server: Arc<Server>,
41 connection_killers: Arc<Mutex<HashMap<PeerId, Arc<AtomicBool>>>>,
42 forbid_connections: Arc<AtomicBool>,
43 _test_db: TestDb,
44 test_live_kit_server: Arc<live_kit_client::TestServer>,
45}
46
47impl TestServer {
48 async fn start(deterministic: &Arc<Deterministic>) -> Self {
49 static NEXT_LIVE_KIT_SERVER_ID: AtomicUsize = AtomicUsize::new(0);
50
51 let use_postgres = env::var("USE_POSTGRES").ok();
52 let use_postgres = use_postgres.as_deref();
53 let test_db = if use_postgres == Some("true") || use_postgres == Some("1") {
54 TestDb::postgres(deterministic.build_background())
55 } else {
56 TestDb::sqlite(deterministic.build_background())
57 };
58 let live_kit_server_id = NEXT_LIVE_KIT_SERVER_ID.fetch_add(1, SeqCst);
59 let live_kit_server = live_kit_client::TestServer::create(
60 format!("http://livekit.{}.test", live_kit_server_id),
61 format!("devkey-{}", live_kit_server_id),
62 format!("secret-{}", live_kit_server_id),
63 deterministic.build_background(),
64 )
65 .unwrap();
66 let app_state = Self::build_app_state(&test_db, &live_kit_server).await;
67 let epoch = app_state
68 .db
69 .create_server(&app_state.config.zed_environment)
70 .await
71 .unwrap();
72 let server = Server::new(
73 epoch,
74 app_state.clone(),
75 Executor::Deterministic(deterministic.build_background()),
76 );
77 server.start().await.unwrap();
78 // Advance clock to ensure the server's cleanup task is finished.
79 deterministic.advance_clock(CLEANUP_TIMEOUT);
80 Self {
81 app_state,
82 server,
83 connection_killers: Default::default(),
84 forbid_connections: Default::default(),
85 _test_db: test_db,
86 test_live_kit_server: live_kit_server,
87 }
88 }
89
90 async fn reset(&self) {
91 self.app_state.db.reset();
92 let epoch = self
93 .app_state
94 .db
95 .create_server(&self.app_state.config.zed_environment)
96 .await
97 .unwrap();
98 self.server.reset(epoch);
99 }
100
101 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
102 cx.update(|cx| {
103 cx.set_global(HomeDir(Path::new("/tmp/").to_path_buf()));
104
105 let mut settings = Settings::test(cx);
106 settings.projects_online_by_default = false;
107 cx.set_global(settings);
108 });
109
110 let http = FakeHttpClient::with_404_response();
111 let user_id = if let Ok(Some(user)) = self
112 .app_state
113 .db
114 .get_user_by_github_account(name, None)
115 .await
116 {
117 user.id
118 } else {
119 self.app_state
120 .db
121 .create_user(
122 &format!("{name}@example.com"),
123 false,
124 NewUserParams {
125 github_login: name.into(),
126 github_user_id: 0,
127 invite_count: 0,
128 },
129 )
130 .await
131 .expect("creating user failed")
132 .user_id
133 };
134 let client_name = name.to_string();
135 let mut client = cx.read(|cx| Client::new(http.clone(), cx));
136 let server = self.server.clone();
137 let db = self.app_state.db.clone();
138 let connection_killers = self.connection_killers.clone();
139 let forbid_connections = self.forbid_connections.clone();
140
141 Arc::get_mut(&mut client)
142 .unwrap()
143 .set_id(user_id.0 as usize)
144 .override_authenticate(move |cx| {
145 cx.spawn(|_| async move {
146 let access_token = "the-token".to_string();
147 Ok(Credentials {
148 user_id: user_id.0 as u64,
149 access_token,
150 })
151 })
152 })
153 .override_establish_connection(move |credentials, cx| {
154 assert_eq!(credentials.user_id, user_id.0 as u64);
155 assert_eq!(credentials.access_token, "the-token");
156
157 let server = server.clone();
158 let db = db.clone();
159 let connection_killers = connection_killers.clone();
160 let forbid_connections = forbid_connections.clone();
161 let client_name = client_name.clone();
162 cx.spawn(move |cx| async move {
163 if forbid_connections.load(SeqCst) {
164 Err(EstablishConnectionError::other(anyhow!(
165 "server is forbidding connections"
166 )))
167 } else {
168 let (client_conn, server_conn, killed) =
169 Connection::in_memory(cx.background());
170 let (connection_id_tx, connection_id_rx) = oneshot::channel();
171 let user = db
172 .get_user_by_id(user_id)
173 .await
174 .expect("retrieving user failed")
175 .unwrap();
176 cx.background()
177 .spawn(server.handle_connection(
178 server_conn,
179 client_name,
180 user,
181 Some(connection_id_tx),
182 Executor::Deterministic(cx.background()),
183 ))
184 .detach();
185 let connection_id = connection_id_rx.await.unwrap();
186 connection_killers
187 .lock()
188 .insert(connection_id.into(), killed);
189 Ok(client_conn)
190 }
191 })
192 });
193
194 let fs = FakeFs::new(cx.background());
195 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
196 let app_state = Arc::new(workspace::AppState {
197 client: client.clone(),
198 user_store: user_store.clone(),
199 languages: Arc::new(LanguageRegistry::new(Task::ready(()))),
200 themes: ThemeRegistry::new((), cx.font_cache()),
201 fs: fs.clone(),
202 build_window_options: Default::default,
203 initialize_workspace: |_, _, _| unimplemented!(),
204 dock_default_item_factory: |_, _| unimplemented!(),
205 });
206
207 Project::init(&client);
208 cx.update(|cx| {
209 workspace::init(app_state.clone(), cx);
210 call::init(client.clone(), user_store.clone(), cx);
211 });
212
213 client
214 .authenticate_and_connect(false, &cx.to_async())
215 .await
216 .unwrap();
217
218 let client = TestClient {
219 client,
220 username: name.to_string(),
221 local_projects: Default::default(),
222 remote_projects: Default::default(),
223 next_root_dir_id: 0,
224 user_store,
225 fs,
226 language_registry: Arc::new(LanguageRegistry::test()),
227 buffers: Default::default(),
228 };
229 client.wait_for_current_user(cx).await;
230 client
231 }
232
233 fn disconnect_client(&self, peer_id: PeerId) {
234 self.connection_killers
235 .lock()
236 .remove(&peer_id)
237 .unwrap()
238 .store(true, SeqCst);
239 }
240
241 fn forbid_connections(&self) {
242 self.forbid_connections.store(true, SeqCst);
243 }
244
245 fn allow_connections(&self) {
246 self.forbid_connections.store(false, SeqCst);
247 }
248
249 async fn make_contacts(&self, clients: &mut [(&TestClient, &mut TestAppContext)]) {
250 for ix in 1..clients.len() {
251 let (left, right) = clients.split_at_mut(ix);
252 let (client_a, cx_a) = left.last_mut().unwrap();
253 for (client_b, cx_b) in right {
254 client_a
255 .user_store
256 .update(*cx_a, |store, cx| {
257 store.request_contact(client_b.user_id().unwrap(), cx)
258 })
259 .await
260 .unwrap();
261 cx_a.foreground().run_until_parked();
262 client_b
263 .user_store
264 .update(*cx_b, |store, cx| {
265 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
266 })
267 .await
268 .unwrap();
269 }
270 }
271 }
272
273 async fn create_room(&self, clients: &mut [(&TestClient, &mut TestAppContext)]) {
274 self.make_contacts(clients).await;
275
276 let (left, right) = clients.split_at_mut(1);
277 let (_client_a, cx_a) = &mut left[0];
278 let active_call_a = cx_a.read(ActiveCall::global);
279
280 for (client_b, cx_b) in right {
281 let user_id_b = client_b.current_user_id(*cx_b).to_proto();
282 active_call_a
283 .update(*cx_a, |call, cx| call.invite(user_id_b, None, cx))
284 .await
285 .unwrap();
286
287 cx_b.foreground().run_until_parked();
288 let active_call_b = cx_b.read(ActiveCall::global);
289 active_call_b
290 .update(*cx_b, |call, cx| call.accept_incoming(cx))
291 .await
292 .unwrap();
293 }
294 }
295
296 async fn build_app_state(
297 test_db: &TestDb,
298 fake_server: &live_kit_client::TestServer,
299 ) -> Arc<AppState> {
300 Arc::new(AppState {
301 db: test_db.db().clone(),
302 live_kit_client: Some(Arc::new(fake_server.create_api_client())),
303 config: Default::default(),
304 })
305 }
306}
307
308impl Deref for TestServer {
309 type Target = Server;
310
311 fn deref(&self) -> &Self::Target {
312 &self.server
313 }
314}
315
316impl Drop for TestServer {
317 fn drop(&mut self) {
318 self.server.teardown();
319 self.test_live_kit_server.teardown().unwrap();
320 }
321}
322
323struct TestClient {
324 client: Arc<Client>,
325 username: String,
326 local_projects: Vec<ModelHandle<Project>>,
327 remote_projects: Vec<ModelHandle<Project>>,
328 next_root_dir_id: usize,
329 pub user_store: ModelHandle<UserStore>,
330 language_registry: Arc<LanguageRegistry>,
331 fs: Arc<FakeFs>,
332 buffers: HashMap<ModelHandle<Project>, HashSet<ModelHandle<language::Buffer>>>,
333}
334
335impl Deref for TestClient {
336 type Target = Arc<Client>;
337
338 fn deref(&self) -> &Self::Target {
339 &self.client
340 }
341}
342
343struct ContactsSummary {
344 pub current: Vec<String>,
345 pub outgoing_requests: Vec<String>,
346 pub incoming_requests: Vec<String>,
347}
348
349impl TestClient {
350 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
351 UserId::from_proto(
352 self.user_store
353 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
354 )
355 }
356
357 async fn wait_for_current_user(&self, cx: &TestAppContext) {
358 let mut authed_user = self
359 .user_store
360 .read_with(cx, |user_store, _| user_store.watch_current_user());
361 while authed_user.next().await.unwrap().is_none() {}
362 }
363
364 async fn clear_contacts(&self, cx: &mut TestAppContext) {
365 self.user_store
366 .update(cx, |store, _| store.clear_contacts())
367 .await;
368 }
369
370 fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
371 self.user_store.read_with(cx, |store, _| ContactsSummary {
372 current: store
373 .contacts()
374 .iter()
375 .map(|contact| contact.user.github_login.clone())
376 .collect(),
377 outgoing_requests: store
378 .outgoing_contact_requests()
379 .iter()
380 .map(|user| user.github_login.clone())
381 .collect(),
382 incoming_requests: store
383 .incoming_contact_requests()
384 .iter()
385 .map(|user| user.github_login.clone())
386 .collect(),
387 })
388 }
389
390 async fn build_local_project(
391 &self,
392 root_path: impl AsRef<Path>,
393 cx: &mut TestAppContext,
394 ) -> (ModelHandle<Project>, WorktreeId) {
395 let project = cx.update(|cx| {
396 Project::local(
397 self.client.clone(),
398 self.user_store.clone(),
399 self.language_registry.clone(),
400 self.fs.clone(),
401 cx,
402 )
403 });
404 let (worktree, _) = project
405 .update(cx, |p, cx| {
406 p.find_or_create_local_worktree(root_path, true, cx)
407 })
408 .await
409 .unwrap();
410 worktree
411 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
412 .await;
413 (project, worktree.read_with(cx, |tree, _| tree.id()))
414 }
415
416 async fn build_remote_project(
417 &self,
418 host_project_id: u64,
419 guest_cx: &mut TestAppContext,
420 ) -> ModelHandle<Project> {
421 let active_call = guest_cx.read(ActiveCall::global);
422 let room = active_call.read_with(guest_cx, |call, _| call.room().unwrap().clone());
423 room.update(guest_cx, |room, cx| {
424 room.join_project(
425 host_project_id,
426 self.language_registry.clone(),
427 self.fs.clone(),
428 cx,
429 )
430 })
431 .await
432 .unwrap()
433 }
434
435 fn build_workspace(
436 &self,
437 project: &ModelHandle<Project>,
438 cx: &mut TestAppContext,
439 ) -> ViewHandle<Workspace> {
440 let (_, root_view) = cx.add_window(|_| EmptyView);
441 cx.add_view(&root_view, |cx| {
442 Workspace::new(
443 Default::default(),
444 0,
445 project.clone(),
446 |_, _| unimplemented!(),
447 cx,
448 )
449 })
450 }
451
452 fn create_new_root_dir(&mut self) -> PathBuf {
453 format!(
454 "/{}-root-{}",
455 self.username,
456 util::post_inc(&mut self.next_root_dir_id)
457 )
458 .into()
459 }
460}
461
462impl Drop for TestClient {
463 fn drop(&mut self) {
464 self.client.teardown();
465 }
466}