1use crate::{
2 admin, auth, github,
3 rpc::{self, add_rpc_routes},
4 AppState, Config,
5};
6use async_std::task;
7use gpui::TestAppContext;
8use rand::prelude::*;
9use serde_json::json;
10use sqlx::{
11 migrate::{MigrateDatabase, Migrator},
12 postgres::PgPoolOptions,
13 Executor as _, Postgres,
14};
15use std::{path::Path, sync::Arc};
16use zed::{
17 editor::Editor,
18 fs::{FakeFs, Fs as _},
19 language::LanguageRegistry,
20 rpc::Client,
21 settings,
22 test::Channel,
23 worktree::Worktree,
24};
25use zrpc::{ForegroundRouter, Peer, Router};
26
27#[gpui::test]
28async fn test_share_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
29 let (window_b, _) = cx_b.add_window(|_| EmptyView);
30 let settings = settings::channel(&cx_b.font_cache()).unwrap().1;
31 let lang_registry = Arc::new(LanguageRegistry::new());
32
33 // Connect to a server as 2 clients.
34 let mut server = TestServer::start().await;
35 let client_a = server.create_client(&mut cx_a, "user_a").await;
36 let client_b = server.create_client(&mut cx_b, "user_b").await;
37
38 // Share a local worktree as client A
39 let fs = Arc::new(FakeFs::new());
40 fs.insert_tree(
41 "/a",
42 json!({
43 "a.txt": "a-contents",
44 "b.txt": "b-contents",
45 }),
46 )
47 .await;
48 let worktree_a = Worktree::open_local(
49 "/a".as_ref(),
50 lang_registry.clone(),
51 fs,
52 &mut cx_a.to_async(),
53 )
54 .await
55 .unwrap();
56 worktree_a
57 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
58 .await;
59 let (worktree_id, worktree_token) = worktree_a
60 .update(&mut cx_a, |tree, cx| {
61 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
62 })
63 .await
64 .unwrap();
65
66 // Join that worktree as client B, and see that a guest has joined as client A.
67 let worktree_b = Worktree::open_remote(
68 client_b.clone(),
69 worktree_id,
70 worktree_token,
71 lang_registry.clone(),
72 &mut cx_b.to_async(),
73 )
74 .await
75 .unwrap();
76 let replica_id_b = worktree_b.read_with(&cx_b, |tree, _| tree.replica_id());
77 worktree_a
78 .condition(&cx_a, |tree, _| {
79 tree.peers()
80 .values()
81 .any(|replica_id| *replica_id == replica_id_b)
82 })
83 .await;
84
85 // Open the same file as client B and client A.
86 let buffer_b = worktree_b
87 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
88 .await
89 .unwrap();
90 buffer_b.read_with(&cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
91 worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
92 let buffer_a = worktree_a
93 .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
94 .await
95 .unwrap();
96
97 // Create a selection set as client B and see that selection set as client A.
98 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, settings, cx));
99 buffer_a
100 .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
101 .await;
102
103 // Edit the buffer as client B and see that edit as client A.
104 editor_b.update(&mut cx_b, |editor, cx| {
105 editor.insert(&"ok, ".to_string(), cx)
106 });
107 buffer_a
108 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
109 .await;
110
111 // Remove the selection set as client B, see those selections disappear as client A.
112 cx_b.update(move |_| drop(editor_b));
113 buffer_a
114 .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
115 .await;
116
117 // Close the buffer as client A, see that the buffer is closed.
118 drop(buffer_a);
119 worktree_a
120 .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
121 .await;
122
123 // Dropping the worktree removes client B from client A's peers.
124 cx_b.update(move |_| drop(worktree_b));
125 worktree_a
126 .condition(&cx_a, |tree, _| tree.peers().is_empty())
127 .await;
128}
129
130#[gpui::test]
131async fn test_propagate_saves_and_fs_changes_in_shared_worktree(
132 mut cx_a: TestAppContext,
133 mut cx_b: TestAppContext,
134 mut cx_c: TestAppContext,
135) {
136 let lang_registry = Arc::new(LanguageRegistry::new());
137
138 // Connect to a server as 3 clients.
139 let mut server = TestServer::start().await;
140 let client_a = server.create_client(&mut cx_a, "user_a").await;
141 let client_b = server.create_client(&mut cx_b, "user_b").await;
142 let client_c = server.create_client(&mut cx_c, "user_c").await;
143
144 let fs = Arc::new(FakeFs::new());
145
146 // Share a worktree as client A.
147 fs.insert_tree(
148 "/a",
149 json!({
150 "file1": "",
151 "file2": ""
152 }),
153 )
154 .await;
155
156 let worktree_a = Worktree::open_local(
157 "/a".as_ref(),
158 lang_registry.clone(),
159 fs.clone(),
160 &mut cx_a.to_async(),
161 )
162 .await
163 .unwrap();
164 worktree_a
165 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
166 .await;
167 let (worktree_id, worktree_token) = worktree_a
168 .update(&mut cx_a, |tree, cx| {
169 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
170 })
171 .await
172 .unwrap();
173
174 // Join that worktree as clients B and C.
175 let worktree_b = Worktree::open_remote(
176 client_b.clone(),
177 worktree_id,
178 worktree_token.clone(),
179 lang_registry.clone(),
180 &mut cx_b.to_async(),
181 )
182 .await
183 .unwrap();
184 let worktree_c = Worktree::open_remote(
185 client_c.clone(),
186 worktree_id,
187 worktree_token,
188 lang_registry.clone(),
189 &mut cx_c.to_async(),
190 )
191 .await
192 .unwrap();
193
194 // Open and edit a buffer as both guests B and C.
195 let buffer_b = worktree_b
196 .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
197 .await
198 .unwrap();
199 let buffer_c = worktree_c
200 .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
201 .await
202 .unwrap();
203 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
204 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
205
206 // Open and edit that buffer as the host.
207 let buffer_a = worktree_a
208 .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
209 .await
210 .unwrap();
211
212 buffer_a
213 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
214 .await;
215 buffer_a.update(&mut cx_a, |buf, cx| {
216 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
217 });
218
219 // Wait for edits to propagate
220 buffer_a
221 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
222 .await;
223 buffer_b
224 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
225 .await;
226 buffer_c
227 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
228 .await;
229
230 // Edit the buffer as the host and concurrently save as guest B.
231 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
232 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
233 save_b.await.unwrap();
234 assert_eq!(
235 fs.load("/a/file1".as_ref()).await.unwrap(),
236 "hi-a, i-am-c, i-am-b, i-am-a"
237 );
238 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
239 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
240 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
241
242 // Make changes on host's file system, see those changes on the guests.
243 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
244 .await
245 .unwrap();
246 fs.insert_file(Path::new("/a/file4"), "4".into())
247 .await
248 .unwrap();
249
250 worktree_b
251 .condition(&cx_b, |tree, _| tree.file_count() == 3)
252 .await;
253 worktree_c
254 .condition(&cx_c, |tree, _| tree.file_count() == 3)
255 .await;
256 worktree_b.read_with(&cx_b, |tree, _| {
257 assert_eq!(
258 tree.paths()
259 .map(|p| p.to_string_lossy())
260 .collect::<Vec<_>>(),
261 &["file1", "file3", "file4"]
262 )
263 });
264 worktree_c.read_with(&cx_c, |tree, _| {
265 assert_eq!(
266 tree.paths()
267 .map(|p| p.to_string_lossy())
268 .collect::<Vec<_>>(),
269 &["file1", "file3", "file4"]
270 )
271 });
272}
273
274#[gpui::test]
275async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
276 let lang_registry = Arc::new(LanguageRegistry::new());
277
278 // Connect to a server as 2 clients.
279 let mut server = TestServer::start().await;
280 let client_a = server.create_client(&mut cx_a, "user_a").await;
281 let client_b = server.create_client(&mut cx_b, "user_b").await;
282
283 // Share a local worktree as client A
284 let fs = Arc::new(FakeFs::new());
285 fs.save(Path::new("/a.txt"), &"a-contents".into())
286 .await
287 .unwrap();
288 let worktree_a = Worktree::open_local(
289 "/".as_ref(),
290 lang_registry.clone(),
291 fs,
292 &mut cx_a.to_async(),
293 )
294 .await
295 .unwrap();
296 worktree_a
297 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
298 .await;
299 let (worktree_id, worktree_token) = worktree_a
300 .update(&mut cx_a, |tree, cx| {
301 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
302 })
303 .await
304 .unwrap();
305
306 // Join that worktree as client B, and see that a guest has joined as client A.
307 let worktree_b = Worktree::open_remote(
308 client_b.clone(),
309 worktree_id,
310 worktree_token,
311 lang_registry.clone(),
312 &mut cx_b.to_async(),
313 )
314 .await
315 .unwrap();
316
317 let buffer_b = worktree_b
318 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
319 .await
320 .unwrap();
321 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime);
322
323 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
324 buffer_b.read_with(&cx_b, |buf, _| {
325 assert!(buf.is_dirty());
326 assert!(!buf.has_conflict());
327 });
328
329 buffer_b
330 .update(&mut cx_b, |buf, cx| buf.save(cx))
331 .unwrap()
332 .await
333 .unwrap();
334 worktree_b
335 .condition(&cx_b, |_, cx| {
336 buffer_b.read(cx).file().unwrap().mtime != mtime
337 })
338 .await;
339 buffer_b.read_with(&cx_b, |buf, _| {
340 assert!(!buf.is_dirty());
341 assert!(!buf.has_conflict());
342 });
343
344 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
345 buffer_b.read_with(&cx_b, |buf, _| {
346 assert!(buf.is_dirty());
347 assert!(!buf.has_conflict());
348 });
349}
350
351#[gpui::test]
352async fn test_editing_while_guest_opens_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
353 let lang_registry = Arc::new(LanguageRegistry::new());
354
355 // Connect to a server as 2 clients.
356 let mut server = TestServer::start().await;
357 let client_a = server.create_client(&mut cx_a, "user_a").await;
358 let client_b = server.create_client(&mut cx_b, "user_b").await;
359
360 // Share a local worktree as client A
361 let fs = Arc::new(FakeFs::new());
362 fs.save(Path::new("/a.txt"), &"a-contents".into())
363 .await
364 .unwrap();
365 let worktree_a = Worktree::open_local(
366 "/".as_ref(),
367 lang_registry.clone(),
368 fs,
369 &mut cx_a.to_async(),
370 )
371 .await
372 .unwrap();
373 worktree_a
374 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
375 .await;
376 let (worktree_id, worktree_token) = worktree_a
377 .update(&mut cx_a, |tree, cx| {
378 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
379 })
380 .await
381 .unwrap();
382
383 // Join that worktree as client B, and see that a guest has joined as client A.
384 let worktree_b = Worktree::open_remote(
385 client_b.clone(),
386 worktree_id,
387 worktree_token,
388 lang_registry.clone(),
389 &mut cx_b.to_async(),
390 )
391 .await
392 .unwrap();
393
394 let buffer_a = worktree_a
395 .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
396 .await
397 .unwrap();
398 let buffer_b = cx_b
399 .background()
400 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
401
402 task::yield_now().await;
403 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
404
405 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
406 let buffer_b = buffer_b.await.unwrap();
407 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
408}
409
410#[gpui::test]
411async fn test_peer_disconnection(mut cx_a: TestAppContext, cx_b: TestAppContext) {
412 let lang_registry = Arc::new(LanguageRegistry::new());
413
414 // Connect to a server as 2 clients.
415 let mut server = TestServer::start().await;
416 let client_a = server.create_client(&mut cx_a, "user_a").await;
417 let client_b = server.create_client(&mut cx_a, "user_b").await;
418
419 // Share a local worktree as client A
420 let fs = Arc::new(FakeFs::new());
421 fs.insert_tree(
422 "/a",
423 json!({
424 "a.txt": "a-contents",
425 "b.txt": "b-contents",
426 }),
427 )
428 .await;
429 let worktree_a = Worktree::open_local(
430 "/a".as_ref(),
431 lang_registry.clone(),
432 fs,
433 &mut cx_a.to_async(),
434 )
435 .await
436 .unwrap();
437 worktree_a
438 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
439 .await;
440 let (worktree_id, worktree_token) = worktree_a
441 .update(&mut cx_a, |tree, cx| {
442 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
443 })
444 .await
445 .unwrap();
446
447 // Join that worktree as client B, and see that a guest has joined as client A.
448 let _worktree_b = Worktree::open_remote(
449 client_b.clone(),
450 worktree_id,
451 worktree_token,
452 lang_registry.clone(),
453 &mut cx_b.to_async(),
454 )
455 .await
456 .unwrap();
457 worktree_a
458 .condition(&cx_a, |tree, _| tree.peers().len() == 1)
459 .await;
460
461 // Drop client B's connection and ensure client A observes client B leaving the worktree.
462 client_b.disconnect().await.unwrap();
463 worktree_a
464 .condition(&cx_a, |tree, _| tree.peers().len() == 0)
465 .await;
466}
467
468struct TestServer {
469 peer: Arc<Peer>,
470 app_state: Arc<AppState>,
471 db_name: String,
472 router: Arc<Router>,
473}
474
475impl TestServer {
476 async fn start() -> Self {
477 let mut rng = StdRng::from_entropy();
478 let db_name = format!("zed-test-{}", rng.gen::<u128>());
479 let app_state = Self::build_app_state(&db_name).await;
480 let peer = Peer::new();
481 let mut router = Router::new();
482 add_rpc_routes(&mut router, &app_state, &peer);
483 Self {
484 peer,
485 router: Arc::new(router),
486 app_state,
487 db_name,
488 }
489 }
490
491 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> Client {
492 let user_id = admin::create_user(&self.app_state.db, name, false)
493 .await
494 .unwrap();
495 let lang_registry = Arc::new(LanguageRegistry::new());
496 let client = Client::new(lang_registry.clone());
497 let mut client_router = ForegroundRouter::new();
498 cx.update(|cx| zed::worktree::init(cx, &client, &mut client_router));
499
500 let (client_conn, server_conn) = Channel::bidirectional();
501 cx.background()
502 .spawn(rpc::handle_connection(
503 self.peer.clone(),
504 self.router.clone(),
505 self.app_state.clone(),
506 name.to_string(),
507 server_conn,
508 user_id,
509 ))
510 .detach();
511 client
512 .add_connection(client_conn, Arc::new(client_router), cx.to_async())
513 .await
514 .unwrap();
515
516 // Reset the executor because running SQL queries has a non-deterministic impact on it.
517 cx.foreground().reset();
518 client
519 }
520
521 async fn build_app_state(db_name: &str) -> Arc<AppState> {
522 let mut config = Config::default();
523 config.session_secret = "a".repeat(32);
524 config.database_url = format!("postgres://postgres@localhost/{}", db_name);
525
526 Self::create_db(&config.database_url).await;
527 let db = PgPoolOptions::new()
528 .max_connections(5)
529 .connect(&config.database_url)
530 .await
531 .expect("failed to connect to postgres database");
532 let migrator = Migrator::new(Path::new("./migrations")).await.unwrap();
533 migrator.run(&db).await.unwrap();
534
535 let github_client = github::AppClient::test();
536 Arc::new(AppState {
537 db,
538 handlebars: Default::default(),
539 auth_client: auth::build_client("", ""),
540 repo_client: github::RepoClient::test(&github_client),
541 github_client,
542 rpc: Default::default(),
543 config,
544 })
545 }
546
547 async fn create_db(url: &str) {
548 // Enable tests to run in parallel by serializing the creation of each test database.
549 lazy_static::lazy_static! {
550 static ref DB_CREATION: async_std::sync::Mutex<()> = async_std::sync::Mutex::new(());
551 }
552
553 let _lock = DB_CREATION.lock().await;
554 Postgres::create_database(url)
555 .await
556 .expect("failed to create test database");
557 }
558}
559
560impl Drop for TestServer {
561 fn drop(&mut self) {
562 task::block_on(async {
563 self.peer.reset().await;
564 self.app_state
565 .db
566 .execute(
567 format!(
568 "
569 SELECT pg_terminate_backend(pg_stat_activity.pid)
570 FROM pg_stat_activity
571 WHERE pg_stat_activity.datname = '{}' AND pid <> pg_backend_pid();",
572 self.db_name,
573 )
574 .as_str(),
575 )
576 .await
577 .unwrap();
578 self.app_state.db.close().await;
579 Postgres::drop_database(&self.app_state.config.database_url)
580 .await
581 .unwrap();
582 });
583 }
584}
585
586struct EmptyView;
587
588impl gpui::Entity for EmptyView {
589 type Event = ();
590}
591
592impl gpui::View for EmptyView {
593 fn ui_name() -> &'static str {
594 "empty view"
595 }
596
597 fn render<'a>(&self, _: &gpui::AppContext) -> gpui::ElementBox {
598 gpui::Element::boxed(gpui::elements::Empty)
599 }
600}