1use crate::{
2 db::{tests::TestDb, NewUserParams, UserId},
3 executor::Executor,
4 rpc::{Server, CLEANUP_TIMEOUT},
5 AppState,
6};
7use anyhow::anyhow;
8use call::{ActiveCall, Room};
9use channel::ChannelStore;
10use client::{
11 self, proto::PeerId, Client, Connection, Credentials, EstablishConnectionError, UserStore,
12};
13use collections::{HashMap, HashSet};
14use fs::FakeFs;
15use futures::{channel::oneshot, StreamExt as _};
16use gpui::{executor::Deterministic, ModelHandle, Task, TestAppContext, WindowHandle};
17use language::LanguageRegistry;
18use parking_lot::Mutex;
19use project::{Project, WorktreeId};
20use settings::SettingsStore;
21use std::{
22 cell::{Ref, RefCell, RefMut},
23 env,
24 ops::{Deref, DerefMut},
25 path::Path,
26 sync::{
27 atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
28 Arc,
29 },
30};
31use util::http::FakeHttpClient;
32use workspace::Workspace;
33
34mod channel_buffer_tests;
35mod channel_tests;
36mod integration_tests;
37mod randomized_integration_tests;
38
39struct TestServer {
40 app_state: Arc<AppState>,
41 server: Arc<Server>,
42 connection_killers: Arc<Mutex<HashMap<PeerId, Arc<AtomicBool>>>>,
43 forbid_connections: Arc<AtomicBool>,
44 _test_db: TestDb,
45 test_live_kit_server: Arc<live_kit_client::TestServer>,
46}
47
48impl TestServer {
49 async fn start(deterministic: &Arc<Deterministic>) -> Self {
50 static NEXT_LIVE_KIT_SERVER_ID: AtomicUsize = AtomicUsize::new(0);
51
52 let use_postgres = env::var("USE_POSTGRES").ok();
53 let use_postgres = use_postgres.as_deref();
54 let test_db = if use_postgres == Some("true") || use_postgres == Some("1") {
55 TestDb::postgres(deterministic.build_background())
56 } else {
57 TestDb::sqlite(deterministic.build_background())
58 };
59 let live_kit_server_id = NEXT_LIVE_KIT_SERVER_ID.fetch_add(1, SeqCst);
60 let live_kit_server = live_kit_client::TestServer::create(
61 format!("http://livekit.{}.test", live_kit_server_id),
62 format!("devkey-{}", live_kit_server_id),
63 format!("secret-{}", live_kit_server_id),
64 deterministic.build_background(),
65 )
66 .unwrap();
67 let app_state = Self::build_app_state(&test_db, &live_kit_server).await;
68 let epoch = app_state
69 .db
70 .create_server(&app_state.config.zed_environment)
71 .await
72 .unwrap();
73 let server = Server::new(
74 epoch,
75 app_state.clone(),
76 Executor::Deterministic(deterministic.build_background()),
77 );
78 server.start().await.unwrap();
79 // Advance clock to ensure the server's cleanup task is finished.
80 deterministic.advance_clock(CLEANUP_TIMEOUT);
81 Self {
82 app_state,
83 server,
84 connection_killers: Default::default(),
85 forbid_connections: Default::default(),
86 _test_db: test_db,
87 test_live_kit_server: live_kit_server,
88 }
89 }
90
91 async fn reset(&self) {
92 self.app_state.db.reset();
93 let epoch = self
94 .app_state
95 .db
96 .create_server(&self.app_state.config.zed_environment)
97 .await
98 .unwrap();
99 self.server.reset(epoch);
100 }
101
102 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
103 cx.update(|cx| {
104 if cx.has_global::<SettingsStore>() {
105 panic!("Same cx used to create two test clients")
106 }
107 cx.set_global(SettingsStore::test(cx));
108 });
109
110 let http = FakeHttpClient::with_404_response();
111 let user_id = if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await
112 {
113 user.id
114 } else {
115 self.app_state
116 .db
117 .create_user(
118 &format!("{name}@example.com"),
119 false,
120 NewUserParams {
121 github_login: name.into(),
122 github_user_id: 0,
123 invite_count: 0,
124 },
125 )
126 .await
127 .expect("creating user failed")
128 .user_id
129 };
130 let client_name = name.to_string();
131 let mut client = cx.read(|cx| Client::new(http.clone(), cx));
132 let server = self.server.clone();
133 let db = self.app_state.db.clone();
134 let connection_killers = self.connection_killers.clone();
135 let forbid_connections = self.forbid_connections.clone();
136
137 Arc::get_mut(&mut client)
138 .unwrap()
139 .set_id(user_id.0 as usize)
140 .override_authenticate(move |cx| {
141 cx.spawn(|_| async move {
142 let access_token = "the-token".to_string();
143 Ok(Credentials {
144 user_id: user_id.0 as u64,
145 access_token,
146 })
147 })
148 })
149 .override_establish_connection(move |credentials, cx| {
150 assert_eq!(credentials.user_id, user_id.0 as u64);
151 assert_eq!(credentials.access_token, "the-token");
152
153 let server = server.clone();
154 let db = db.clone();
155 let connection_killers = connection_killers.clone();
156 let forbid_connections = forbid_connections.clone();
157 let client_name = client_name.clone();
158 cx.spawn(move |cx| async move {
159 if forbid_connections.load(SeqCst) {
160 Err(EstablishConnectionError::other(anyhow!(
161 "server is forbidding connections"
162 )))
163 } else {
164 let (client_conn, server_conn, killed) =
165 Connection::in_memory(cx.background());
166 let (connection_id_tx, connection_id_rx) = oneshot::channel();
167 let user = db
168 .get_user_by_id(user_id)
169 .await
170 .expect("retrieving user failed")
171 .unwrap();
172 cx.background()
173 .spawn(server.handle_connection(
174 server_conn,
175 client_name,
176 user,
177 Some(connection_id_tx),
178 Executor::Deterministic(cx.background()),
179 ))
180 .detach();
181 let connection_id = connection_id_rx.await.unwrap();
182 connection_killers
183 .lock()
184 .insert(connection_id.into(), killed);
185 Ok(client_conn)
186 }
187 })
188 });
189
190 let fs = FakeFs::new(cx.background());
191 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
192 let channel_store =
193 cx.add_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
194 let app_state = Arc::new(workspace::AppState {
195 client: client.clone(),
196 user_store: user_store.clone(),
197 channel_store: channel_store.clone(),
198 languages: Arc::new(LanguageRegistry::test()),
199 fs: fs.clone(),
200 build_window_options: |_, _, _| Default::default(),
201 initialize_workspace: |_, _, _, _| Task::ready(Ok(())),
202 background_actions: || &[],
203 });
204
205 cx.update(|cx| {
206 theme::init((), cx);
207 Project::init(&client, cx);
208 client::init(&client, cx);
209 language::init(cx);
210 editor::init_settings(cx);
211 workspace::init(app_state.clone(), cx);
212 audio::init((), cx);
213 call::init(client.clone(), user_store.clone(), cx);
214 });
215
216 client
217 .authenticate_and_connect(false, &cx.to_async())
218 .await
219 .unwrap();
220
221 let client = TestClient {
222 app_state,
223 username: name.to_string(),
224 state: Default::default(),
225 };
226 client.wait_for_current_user(cx).await;
227 client
228 }
229
230 fn disconnect_client(&self, peer_id: PeerId) {
231 self.connection_killers
232 .lock()
233 .remove(&peer_id)
234 .unwrap()
235 .store(true, SeqCst);
236 }
237
238 fn forbid_connections(&self) {
239 self.forbid_connections.store(true, SeqCst);
240 }
241
242 fn allow_connections(&self) {
243 self.forbid_connections.store(false, SeqCst);
244 }
245
246 async fn make_contacts(&self, clients: &mut [(&TestClient, &mut TestAppContext)]) {
247 for ix in 1..clients.len() {
248 let (left, right) = clients.split_at_mut(ix);
249 let (client_a, cx_a) = left.last_mut().unwrap();
250 for (client_b, cx_b) in right {
251 client_a
252 .app_state
253 .user_store
254 .update(*cx_a, |store, cx| {
255 store.request_contact(client_b.user_id().unwrap(), cx)
256 })
257 .await
258 .unwrap();
259 cx_a.foreground().run_until_parked();
260 client_b
261 .app_state
262 .user_store
263 .update(*cx_b, |store, cx| {
264 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
265 })
266 .await
267 .unwrap();
268 }
269 }
270 }
271
272 async fn make_channel(
273 &self,
274 channel: &str,
275 admin: (&TestClient, &mut TestAppContext),
276 members: &mut [(&TestClient, &mut TestAppContext)],
277 ) -> u64 {
278 let (admin_client, admin_cx) = admin;
279 let channel_id = admin_client
280 .app_state
281 .channel_store
282 .update(admin_cx, |channel_store, cx| {
283 channel_store.create_channel(channel, None, cx)
284 })
285 .await
286 .unwrap();
287
288 for (member_client, member_cx) in members {
289 admin_client
290 .app_state
291 .channel_store
292 .update(admin_cx, |channel_store, cx| {
293 channel_store.invite_member(
294 channel_id,
295 member_client.user_id().unwrap(),
296 false,
297 cx,
298 )
299 })
300 .await
301 .unwrap();
302
303 admin_cx.foreground().run_until_parked();
304
305 member_client
306 .app_state
307 .channel_store
308 .update(*member_cx, |channels, _| {
309 channels.respond_to_channel_invite(channel_id, true)
310 })
311 .await
312 .unwrap();
313 }
314
315 channel_id
316 }
317
318 async fn create_room(&self, clients: &mut [(&TestClient, &mut TestAppContext)]) {
319 self.make_contacts(clients).await;
320
321 let (left, right) = clients.split_at_mut(1);
322 let (_client_a, cx_a) = &mut left[0];
323 let active_call_a = cx_a.read(ActiveCall::global);
324
325 for (client_b, cx_b) in right {
326 let user_id_b = client_b.current_user_id(*cx_b).to_proto();
327 active_call_a
328 .update(*cx_a, |call, cx| call.invite(user_id_b, None, cx))
329 .await
330 .unwrap();
331
332 cx_b.foreground().run_until_parked();
333 let active_call_b = cx_b.read(ActiveCall::global);
334 active_call_b
335 .update(*cx_b, |call, cx| call.accept_incoming(cx))
336 .await
337 .unwrap();
338 }
339 }
340
341 async fn build_app_state(
342 test_db: &TestDb,
343 fake_server: &live_kit_client::TestServer,
344 ) -> Arc<AppState> {
345 Arc::new(AppState {
346 db: test_db.db().clone(),
347 live_kit_client: Some(Arc::new(fake_server.create_api_client())),
348 config: Default::default(),
349 })
350 }
351}
352
353impl Deref for TestServer {
354 type Target = Server;
355
356 fn deref(&self) -> &Self::Target {
357 &self.server
358 }
359}
360
361impl Drop for TestServer {
362 fn drop(&mut self) {
363 self.server.teardown();
364 self.test_live_kit_server.teardown().unwrap();
365 }
366}
367
368struct TestClient {
369 username: String,
370 state: RefCell<TestClientState>,
371 app_state: Arc<workspace::AppState>,
372}
373
374#[derive(Default)]
375struct TestClientState {
376 local_projects: Vec<ModelHandle<Project>>,
377 remote_projects: Vec<ModelHandle<Project>>,
378 buffers: HashMap<ModelHandle<Project>, HashSet<ModelHandle<language::Buffer>>>,
379}
380
381impl Deref for TestClient {
382 type Target = Arc<Client>;
383
384 fn deref(&self) -> &Self::Target {
385 &self.app_state.client
386 }
387}
388
389struct ContactsSummary {
390 pub current: Vec<String>,
391 pub outgoing_requests: Vec<String>,
392 pub incoming_requests: Vec<String>,
393}
394
395impl TestClient {
396 pub fn fs(&self) -> &FakeFs {
397 self.app_state.fs.as_fake()
398 }
399
400 pub fn channel_store(&self) -> &ModelHandle<ChannelStore> {
401 &self.app_state.channel_store
402 }
403
404 pub fn user_store(&self) -> &ModelHandle<UserStore> {
405 &self.app_state.user_store
406 }
407
408 pub fn language_registry(&self) -> &Arc<LanguageRegistry> {
409 &self.app_state.languages
410 }
411
412 pub fn client(&self) -> &Arc<Client> {
413 &self.app_state.client
414 }
415
416 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
417 UserId::from_proto(
418 self.app_state
419 .user_store
420 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
421 )
422 }
423
424 async fn wait_for_current_user(&self, cx: &TestAppContext) {
425 let mut authed_user = self
426 .app_state
427 .user_store
428 .read_with(cx, |user_store, _| user_store.watch_current_user());
429 while authed_user.next().await.unwrap().is_none() {}
430 }
431
432 async fn clear_contacts(&self, cx: &mut TestAppContext) {
433 self.app_state
434 .user_store
435 .update(cx, |store, _| store.clear_contacts())
436 .await;
437 }
438
439 fn local_projects<'a>(&'a self) -> impl Deref<Target = Vec<ModelHandle<Project>>> + 'a {
440 Ref::map(self.state.borrow(), |state| &state.local_projects)
441 }
442
443 fn remote_projects<'a>(&'a self) -> impl Deref<Target = Vec<ModelHandle<Project>>> + 'a {
444 Ref::map(self.state.borrow(), |state| &state.remote_projects)
445 }
446
447 fn local_projects_mut<'a>(&'a self) -> impl DerefMut<Target = Vec<ModelHandle<Project>>> + 'a {
448 RefMut::map(self.state.borrow_mut(), |state| &mut state.local_projects)
449 }
450
451 fn remote_projects_mut<'a>(&'a self) -> impl DerefMut<Target = Vec<ModelHandle<Project>>> + 'a {
452 RefMut::map(self.state.borrow_mut(), |state| &mut state.remote_projects)
453 }
454
455 fn buffers_for_project<'a>(
456 &'a self,
457 project: &ModelHandle<Project>,
458 ) -> impl DerefMut<Target = HashSet<ModelHandle<language::Buffer>>> + 'a {
459 RefMut::map(self.state.borrow_mut(), |state| {
460 state.buffers.entry(project.clone()).or_default()
461 })
462 }
463
464 fn buffers<'a>(
465 &'a self,
466 ) -> impl DerefMut<Target = HashMap<ModelHandle<Project>, HashSet<ModelHandle<language::Buffer>>>> + 'a
467 {
468 RefMut::map(self.state.borrow_mut(), |state| &mut state.buffers)
469 }
470
471 fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
472 self.app_state
473 .user_store
474 .read_with(cx, |store, _| ContactsSummary {
475 current: store
476 .contacts()
477 .iter()
478 .map(|contact| contact.user.github_login.clone())
479 .collect(),
480 outgoing_requests: store
481 .outgoing_contact_requests()
482 .iter()
483 .map(|user| user.github_login.clone())
484 .collect(),
485 incoming_requests: store
486 .incoming_contact_requests()
487 .iter()
488 .map(|user| user.github_login.clone())
489 .collect(),
490 })
491 }
492
493 async fn build_local_project(
494 &self,
495 root_path: impl AsRef<Path>,
496 cx: &mut TestAppContext,
497 ) -> (ModelHandle<Project>, WorktreeId) {
498 let project = cx.update(|cx| {
499 Project::local(
500 self.client().clone(),
501 self.app_state.user_store.clone(),
502 self.app_state.languages.clone(),
503 self.app_state.fs.clone(),
504 cx,
505 )
506 });
507 let (worktree, _) = project
508 .update(cx, |p, cx| {
509 p.find_or_create_local_worktree(root_path, true, cx)
510 })
511 .await
512 .unwrap();
513 worktree
514 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
515 .await;
516 (project, worktree.read_with(cx, |tree, _| tree.id()))
517 }
518
519 async fn build_remote_project(
520 &self,
521 host_project_id: u64,
522 guest_cx: &mut TestAppContext,
523 ) -> ModelHandle<Project> {
524 let active_call = guest_cx.read(ActiveCall::global);
525 let room = active_call.read_with(guest_cx, |call, _| call.room().unwrap().clone());
526 room.update(guest_cx, |room, cx| {
527 room.join_project(
528 host_project_id,
529 self.app_state.languages.clone(),
530 self.app_state.fs.clone(),
531 cx,
532 )
533 })
534 .await
535 .unwrap()
536 }
537
538 fn build_workspace(
539 &self,
540 project: &ModelHandle<Project>,
541 cx: &mut TestAppContext,
542 ) -> WindowHandle<Workspace> {
543 cx.add_window(|cx| Workspace::new(0, project.clone(), self.app_state.clone(), cx))
544 }
545}
546
547impl Drop for TestClient {
548 fn drop(&mut self) {
549 self.app_state.client.teardown();
550 }
551}
552
553#[derive(Debug, Eq, PartialEq)]
554struct RoomParticipants {
555 remote: Vec<String>,
556 pending: Vec<String>,
557}
558
559fn room_participants(room: &ModelHandle<Room>, cx: &mut TestAppContext) -> RoomParticipants {
560 room.read_with(cx, |room, _| {
561 let mut remote = room
562 .remote_participants()
563 .iter()
564 .map(|(_, participant)| participant.user.github_login.clone())
565 .collect::<Vec<_>>();
566 let mut pending = room
567 .pending_participants()
568 .iter()
569 .map(|user| user.github_login.clone())
570 .collect::<Vec<_>>();
571 remote.sort();
572 pending.sort();
573 RoomParticipants { remote, pending }
574 })
575}