1use crate::{
2 admin, auth, github,
3 rpc::{self, add_rpc_routes},
4 AppState, Config,
5};
6use async_std::task;
7use gpui::TestAppContext;
8use rand::prelude::*;
9use serde_json::json;
10use sqlx::{
11 migrate::{MigrateDatabase, Migrator},
12 postgres::PgPoolOptions,
13 Executor as _, Postgres,
14};
15use std::{path::Path, sync::Arc};
16use zed::{
17 editor::Editor,
18 fs::{FakeFs, Fs as _},
19 language::LanguageRegistry,
20 rpc::Client,
21 settings,
22 test::Channel,
23 worktree::Worktree,
24};
25use zrpc::{ForegroundRouter, Peer, Router};
26
27#[gpui::test]
28async fn test_share_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
29 let (window_b, _) = cx_b.add_window(|_| EmptyView);
30 let settings = settings::channel(&cx_b.font_cache()).unwrap().1;
31 let lang_registry = Arc::new(LanguageRegistry::new());
32
33 // Connect to a server as 2 clients.
34 let mut server = TestServer::start().await;
35 let client_a = server.create_client(&mut cx_a, "user_a").await;
36 let client_b = server.create_client(&mut cx_b, "user_b").await;
37
38 cx_a.foreground().forbid_parking();
39
40 // Share a local worktree as client A
41 let fs = Arc::new(FakeFs::new());
42 fs.insert_tree(
43 "/a",
44 json!({
45 "a.txt": "a-contents",
46 "b.txt": "b-contents",
47 }),
48 )
49 .await;
50 let worktree_a = Worktree::open_local(
51 "/a".as_ref(),
52 lang_registry.clone(),
53 fs,
54 &mut cx_a.to_async(),
55 )
56 .await
57 .unwrap();
58 worktree_a
59 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
60 .await;
61 let (worktree_id, worktree_token) = worktree_a
62 .update(&mut cx_a, |tree, cx| {
63 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
64 })
65 .await
66 .unwrap();
67
68 // Join that worktree as client B, and see that a guest has joined as client A.
69 let worktree_b = Worktree::open_remote(
70 client_b.clone(),
71 worktree_id,
72 worktree_token,
73 lang_registry.clone(),
74 &mut cx_b.to_async(),
75 )
76 .await
77 .unwrap();
78 let replica_id_b = worktree_b.read_with(&cx_b, |tree, _| tree.replica_id());
79 worktree_a
80 .condition(&cx_a, |tree, _| {
81 tree.peers()
82 .values()
83 .any(|replica_id| *replica_id == replica_id_b)
84 })
85 .await;
86
87 // Open the same file as client B and client A.
88 let buffer_b = worktree_b
89 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
90 .await
91 .unwrap();
92 buffer_b.read_with(&cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
93 worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
94 let buffer_a = worktree_a
95 .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
96 .await
97 .unwrap();
98
99 // Create a selection set as client B and see that selection set as client A.
100 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, settings, cx));
101 buffer_a
102 .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
103 .await;
104
105 // Edit the buffer as client B and see that edit as client A.
106 editor_b.update(&mut cx_b, |editor, cx| {
107 editor.insert(&"ok, ".to_string(), cx)
108 });
109 buffer_a
110 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
111 .await;
112
113 // Remove the selection set as client B, see those selections disappear as client A.
114 cx_b.update(move |_| drop(editor_b));
115 buffer_a
116 .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
117 .await;
118
119 // Close the buffer as client A, see that the buffer is closed.
120 drop(buffer_a);
121 worktree_a
122 .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
123 .await;
124
125 // Dropping the worktree removes client B from client A's peers.
126 cx_b.update(move |_| drop(worktree_b));
127 worktree_a
128 .condition(&cx_a, |tree, _| tree.peers().is_empty())
129 .await;
130}
131
132#[gpui::test]
133async fn test_propagate_saves_and_fs_changes_in_shared_worktree(
134 mut cx_a: TestAppContext,
135 mut cx_b: TestAppContext,
136 mut cx_c: TestAppContext,
137) {
138 let lang_registry = Arc::new(LanguageRegistry::new());
139
140 // Connect to a server as 3 clients.
141 let mut server = TestServer::start().await;
142 let client_a = server.create_client(&mut cx_a, "user_a").await;
143 let client_b = server.create_client(&mut cx_b, "user_b").await;
144 let client_c = server.create_client(&mut cx_c, "user_c").await;
145
146 cx_a.foreground().forbid_parking();
147
148 let fs = Arc::new(FakeFs::new());
149
150 // Share a worktree as client A.
151 fs.insert_tree(
152 "/a",
153 json!({
154 "file1": "",
155 "file2": ""
156 }),
157 )
158 .await;
159
160 let worktree_a = Worktree::open_local(
161 "/a".as_ref(),
162 lang_registry.clone(),
163 fs.clone(),
164 &mut cx_a.to_async(),
165 )
166 .await
167 .unwrap();
168 worktree_a
169 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
170 .await;
171 let (worktree_id, worktree_token) = worktree_a
172 .update(&mut cx_a, |tree, cx| {
173 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
174 })
175 .await
176 .unwrap();
177
178 // Join that worktree as clients B and C.
179 let worktree_b = Worktree::open_remote(
180 client_b.clone(),
181 worktree_id,
182 worktree_token.clone(),
183 lang_registry.clone(),
184 &mut cx_b.to_async(),
185 )
186 .await
187 .unwrap();
188 let worktree_c = Worktree::open_remote(
189 client_c.clone(),
190 worktree_id,
191 worktree_token,
192 lang_registry.clone(),
193 &mut cx_c.to_async(),
194 )
195 .await
196 .unwrap();
197
198 // Open and edit a buffer as both guests B and C.
199 let buffer_b = worktree_b
200 .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
201 .await
202 .unwrap();
203 let buffer_c = worktree_c
204 .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
205 .await
206 .unwrap();
207 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
208 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
209
210 // Open and edit that buffer as the host.
211 let buffer_a = worktree_a
212 .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
213 .await
214 .unwrap();
215
216 buffer_a
217 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
218 .await;
219 buffer_a.update(&mut cx_a, |buf, cx| {
220 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
221 });
222
223 // Wait for edits to propagate
224 buffer_a
225 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
226 .await;
227 buffer_b
228 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
229 .await;
230 buffer_c
231 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
232 .await;
233
234 // Edit the buffer as the host and concurrently save as guest B.
235 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
236 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
237 save_b.await.unwrap();
238 assert_eq!(
239 fs.load("/a/file1".as_ref()).await.unwrap(),
240 "hi-a, i-am-c, i-am-b, i-am-a"
241 );
242 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
243 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
244 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
245
246 // Make changes on host's file system, see those changes on the guests.
247 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
248 .await
249 .unwrap();
250 fs.insert_file(Path::new("/a/file4"), "4".into())
251 .await
252 .unwrap();
253
254 worktree_b
255 .condition(&cx_b, |tree, _| tree.file_count() == 3)
256 .await;
257 worktree_c
258 .condition(&cx_c, |tree, _| tree.file_count() == 3)
259 .await;
260 worktree_b.read_with(&cx_b, |tree, _| {
261 assert_eq!(
262 tree.paths()
263 .map(|p| p.to_string_lossy())
264 .collect::<Vec<_>>(),
265 &["file1", "file3", "file4"]
266 )
267 });
268 worktree_c.read_with(&cx_c, |tree, _| {
269 assert_eq!(
270 tree.paths()
271 .map(|p| p.to_string_lossy())
272 .collect::<Vec<_>>(),
273 &["file1", "file3", "file4"]
274 )
275 });
276}
277
278#[gpui::test]
279async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
280 let lang_registry = Arc::new(LanguageRegistry::new());
281
282 // Connect to a server as 2 clients.
283 let mut server = TestServer::start().await;
284 let client_a = server.create_client(&mut cx_a, "user_a").await;
285 let client_b = server.create_client(&mut cx_b, "user_b").await;
286
287 cx_a.foreground().forbid_parking();
288
289 // Share a local worktree as client A
290 let fs = Arc::new(FakeFs::new());
291 fs.save(Path::new("/a.txt"), &"a-contents".into())
292 .await
293 .unwrap();
294 let worktree_a = Worktree::open_local(
295 "/".as_ref(),
296 lang_registry.clone(),
297 fs,
298 &mut cx_a.to_async(),
299 )
300 .await
301 .unwrap();
302 worktree_a
303 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
304 .await;
305 let (worktree_id, worktree_token) = worktree_a
306 .update(&mut cx_a, |tree, cx| {
307 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
308 })
309 .await
310 .unwrap();
311
312 // Join that worktree as client B, and see that a guest has joined as client A.
313 let worktree_b = Worktree::open_remote(
314 client_b.clone(),
315 worktree_id,
316 worktree_token,
317 lang_registry.clone(),
318 &mut cx_b.to_async(),
319 )
320 .await
321 .unwrap();
322
323 let buffer_b = worktree_b
324 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
325 .await
326 .unwrap();
327 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime);
328
329 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
330 buffer_b.read_with(&cx_b, |buf, _| {
331 assert!(buf.is_dirty());
332 assert!(!buf.has_conflict());
333 });
334
335 buffer_b
336 .update(&mut cx_b, |buf, cx| buf.save(cx))
337 .unwrap()
338 .await
339 .unwrap();
340 worktree_b
341 .condition(&cx_b, |_, cx| {
342 buffer_b.read(cx).file().unwrap().mtime != mtime
343 })
344 .await;
345 buffer_b.read_with(&cx_b, |buf, _| {
346 assert!(!buf.is_dirty());
347 assert!(!buf.has_conflict());
348 });
349
350 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
351 buffer_b.read_with(&cx_b, |buf, _| {
352 assert!(buf.is_dirty());
353 assert!(!buf.has_conflict());
354 });
355}
356
357#[gpui::test]
358async fn test_editing_while_guest_opens_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
359 let lang_registry = Arc::new(LanguageRegistry::new());
360
361 // Connect to a server as 2 clients.
362 let mut server = TestServer::start().await;
363 let client_a = server.create_client(&mut cx_a, "user_a").await;
364 let client_b = server.create_client(&mut cx_b, "user_b").await;
365
366 cx_a.foreground().forbid_parking();
367
368 // Share a local worktree as client A
369 let fs = Arc::new(FakeFs::new());
370 fs.save(Path::new("/a.txt"), &"a-contents".into())
371 .await
372 .unwrap();
373 let worktree_a = Worktree::open_local(
374 "/".as_ref(),
375 lang_registry.clone(),
376 fs,
377 &mut cx_a.to_async(),
378 )
379 .await
380 .unwrap();
381 worktree_a
382 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
383 .await;
384 let (worktree_id, worktree_token) = worktree_a
385 .update(&mut cx_a, |tree, cx| {
386 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
387 })
388 .await
389 .unwrap();
390
391 // Join that worktree as client B, and see that a guest has joined as client A.
392 let worktree_b = Worktree::open_remote(
393 client_b.clone(),
394 worktree_id,
395 worktree_token,
396 lang_registry.clone(),
397 &mut cx_b.to_async(),
398 )
399 .await
400 .unwrap();
401
402 let buffer_a = worktree_a
403 .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
404 .await
405 .unwrap();
406 let buffer_b = cx_b
407 .background()
408 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
409
410 task::yield_now().await;
411 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
412
413 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
414 let buffer_b = buffer_b.await.unwrap();
415 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
416}
417
418#[gpui::test]
419async fn test_peer_disconnection(mut cx_a: TestAppContext, cx_b: TestAppContext) {
420 let lang_registry = Arc::new(LanguageRegistry::new());
421
422 // Connect to a server as 2 clients.
423 let mut server = TestServer::start().await;
424 let client_a = server.create_client(&mut cx_a, "user_a").await;
425 let client_b = server.create_client(&mut cx_a, "user_b").await;
426
427 cx_a.foreground().forbid_parking();
428
429 // Share a local worktree as client A
430 let fs = Arc::new(FakeFs::new());
431 fs.insert_tree(
432 "/a",
433 json!({
434 "a.txt": "a-contents",
435 "b.txt": "b-contents",
436 }),
437 )
438 .await;
439 let worktree_a = Worktree::open_local(
440 "/a".as_ref(),
441 lang_registry.clone(),
442 fs,
443 &mut cx_a.to_async(),
444 )
445 .await
446 .unwrap();
447 worktree_a
448 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
449 .await;
450 let (worktree_id, worktree_token) = worktree_a
451 .update(&mut cx_a, |tree, cx| {
452 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
453 })
454 .await
455 .unwrap();
456
457 // Join that worktree as client B, and see that a guest has joined as client A.
458 let _worktree_b = Worktree::open_remote(
459 client_b.clone(),
460 worktree_id,
461 worktree_token,
462 lang_registry.clone(),
463 &mut cx_b.to_async(),
464 )
465 .await
466 .unwrap();
467 worktree_a
468 .condition(&cx_a, |tree, _| tree.peers().len() == 1)
469 .await;
470
471 // Drop client B's connection and ensure client A observes client B leaving the worktree.
472 client_b.disconnect().await.unwrap();
473 worktree_a
474 .condition(&cx_a, |tree, _| tree.peers().len() == 0)
475 .await;
476}
477
478struct TestServer {
479 peer: Arc<Peer>,
480 app_state: Arc<AppState>,
481 db_name: String,
482 router: Arc<Router>,
483}
484
485impl TestServer {
486 async fn start() -> Self {
487 let mut rng = StdRng::from_entropy();
488 let db_name = format!("zed-test-{}", rng.gen::<u128>());
489 let app_state = Self::build_app_state(&db_name).await;
490 let peer = Peer::new();
491 let mut router = Router::new();
492 add_rpc_routes(&mut router, &app_state, &peer);
493 Self {
494 peer,
495 router: Arc::new(router),
496 app_state,
497 db_name,
498 }
499 }
500
501 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> Client {
502 let user_id = admin::create_user(&self.app_state.db, name, false)
503 .await
504 .unwrap();
505 let lang_registry = Arc::new(LanguageRegistry::new());
506 let client = Client::new(lang_registry.clone());
507 let mut client_router = ForegroundRouter::new();
508 cx.update(|cx| zed::worktree::init(cx, &client, &mut client_router));
509
510 let (client_conn, server_conn) = Channel::bidirectional();
511 cx.background()
512 .spawn(rpc::handle_connection(
513 self.peer.clone(),
514 self.router.clone(),
515 self.app_state.clone(),
516 name.to_string(),
517 server_conn,
518 user_id,
519 ))
520 .detach();
521 client
522 .add_connection(client_conn, Arc::new(client_router), cx.to_async())
523 .await
524 .unwrap();
525
526 // Reset the executor because running SQL queries has a non-deterministic impact on it.
527 cx.foreground().reset();
528 client
529 }
530
531 async fn build_app_state(db_name: &str) -> Arc<AppState> {
532 let mut config = Config::default();
533 config.session_secret = "a".repeat(32);
534 config.database_url = format!("postgres://postgres@localhost/{}", db_name);
535
536 Self::create_db(&config.database_url).await;
537 let db = PgPoolOptions::new()
538 .max_connections(5)
539 .connect(&config.database_url)
540 .await
541 .expect("failed to connect to postgres database");
542 let migrator = Migrator::new(Path::new(concat!(
543 env!("CARGO_MANIFEST_DIR"),
544 "/migrations"
545 )))
546 .await
547 .unwrap();
548 migrator.run(&db).await.unwrap();
549
550 let github_client = github::AppClient::test();
551 Arc::new(AppState {
552 db,
553 handlebars: Default::default(),
554 auth_client: auth::build_client("", ""),
555 repo_client: github::RepoClient::test(&github_client),
556 github_client,
557 rpc: Default::default(),
558 config,
559 })
560 }
561
562 async fn create_db(url: &str) {
563 // Enable tests to run in parallel by serializing the creation of each test database.
564 lazy_static::lazy_static! {
565 static ref DB_CREATION: async_std::sync::Mutex<()> = async_std::sync::Mutex::new(());
566 }
567
568 let _lock = DB_CREATION.lock().await;
569 Postgres::create_database(url)
570 .await
571 .expect("failed to create test database");
572 }
573}
574
575impl Drop for TestServer {
576 fn drop(&mut self) {
577 task::block_on(async {
578 self.peer.reset().await;
579 self.app_state
580 .db
581 .execute(
582 format!(
583 "
584 SELECT pg_terminate_backend(pg_stat_activity.pid)
585 FROM pg_stat_activity
586 WHERE pg_stat_activity.datname = '{}' AND pid <> pg_backend_pid();",
587 self.db_name,
588 )
589 .as_str(),
590 )
591 .await
592 .unwrap();
593 self.app_state.db.close().await;
594 Postgres::drop_database(&self.app_state.config.database_url)
595 .await
596 .unwrap();
597 });
598 }
599}
600
601struct EmptyView;
602
603impl gpui::Entity for EmptyView {
604 type Event = ();
605}
606
607impl gpui::View for EmptyView {
608 fn ui_name() -> &'static str {
609 "empty view"
610 }
611
612 fn render<'a>(&self, _: &gpui::AppContext) -> gpui::ElementBox {
613 gpui::Element::boxed(gpui::elements::Empty)
614 }
615}