1use crate::{
2 auth, db, github,
3 rpc::{self, add_rpc_routes},
4 AppState, Config,
5};
6use async_std::task;
7use gpui::TestAppContext;
8use rand::prelude::*;
9use serde_json::json;
10use sqlx::{
11 migrate::{MigrateDatabase, Migrator},
12 Executor as _, Postgres,
13};
14use std::{path::Path, sync::Arc};
15use zed::{
16 editor::Editor,
17 fs::{FakeFs, Fs as _},
18 language::LanguageRegistry,
19 rpc::Client,
20 settings,
21 test::Channel,
22 worktree::Worktree,
23};
24use zrpc::{ForegroundRouter, Peer, Router};
25
26#[gpui::test]
27async fn test_share_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
28 let (window_b, _) = cx_b.add_window(|_| EmptyView);
29 let settings = settings::channel(&cx_b.font_cache()).unwrap().1;
30 let lang_registry = Arc::new(LanguageRegistry::new());
31
32 // Connect to a server as 2 clients.
33 let mut server = TestServer::start().await;
34 let client_a = server.create_client(&mut cx_a, "user_a").await;
35 let client_b = server.create_client(&mut cx_b, "user_b").await;
36
37 cx_a.foreground().forbid_parking();
38
39 // Share a local worktree as client A
40 let fs = Arc::new(FakeFs::new());
41 fs.insert_tree(
42 "/a",
43 json!({
44 "a.txt": "a-contents",
45 "b.txt": "b-contents",
46 }),
47 )
48 .await;
49 let worktree_a = Worktree::open_local(
50 "/a".as_ref(),
51 lang_registry.clone(),
52 fs,
53 &mut cx_a.to_async(),
54 )
55 .await
56 .unwrap();
57 worktree_a
58 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
59 .await;
60 let (worktree_id, worktree_token) = worktree_a
61 .update(&mut cx_a, |tree, cx| {
62 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
63 })
64 .await
65 .unwrap();
66
67 // Join that worktree as client B, and see that a guest has joined as client A.
68 let worktree_b = Worktree::open_remote(
69 client_b.clone(),
70 worktree_id,
71 worktree_token,
72 lang_registry.clone(),
73 &mut cx_b.to_async(),
74 )
75 .await
76 .unwrap();
77 let replica_id_b = worktree_b.read_with(&cx_b, |tree, _| tree.replica_id());
78 worktree_a
79 .condition(&cx_a, |tree, _| {
80 tree.peers()
81 .values()
82 .any(|replica_id| *replica_id == replica_id_b)
83 })
84 .await;
85
86 // Open the same file as client B and client A.
87 let buffer_b = worktree_b
88 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
89 .await
90 .unwrap();
91 buffer_b.read_with(&cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
92 worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
93 let buffer_a = worktree_a
94 .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
95 .await
96 .unwrap();
97
98 // Create a selection set as client B and see that selection set as client A.
99 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, settings, cx));
100 buffer_a
101 .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
102 .await;
103
104 // Edit the buffer as client B and see that edit as client A.
105 editor_b.update(&mut cx_b, |editor, cx| {
106 editor.insert(&"ok, ".to_string(), cx)
107 });
108 buffer_a
109 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
110 .await;
111
112 // Remove the selection set as client B, see those selections disappear as client A.
113 cx_b.update(move |_| drop(editor_b));
114 buffer_a
115 .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
116 .await;
117
118 // Close the buffer as client A, see that the buffer is closed.
119 drop(buffer_a);
120 worktree_a
121 .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
122 .await;
123
124 // Dropping the worktree removes client B from client A's peers.
125 cx_b.update(move |_| drop(worktree_b));
126 worktree_a
127 .condition(&cx_a, |tree, _| tree.peers().is_empty())
128 .await;
129}
130
131#[gpui::test]
132async fn test_propagate_saves_and_fs_changes_in_shared_worktree(
133 mut cx_a: TestAppContext,
134 mut cx_b: TestAppContext,
135 mut cx_c: TestAppContext,
136) {
137 let lang_registry = Arc::new(LanguageRegistry::new());
138
139 // Connect to a server as 3 clients.
140 let mut server = TestServer::start().await;
141 let client_a = server.create_client(&mut cx_a, "user_a").await;
142 let client_b = server.create_client(&mut cx_b, "user_b").await;
143 let client_c = server.create_client(&mut cx_c, "user_c").await;
144
145 cx_a.foreground().forbid_parking();
146
147 let fs = Arc::new(FakeFs::new());
148
149 // Share a worktree as client A.
150 fs.insert_tree(
151 "/a",
152 json!({
153 "file1": "",
154 "file2": ""
155 }),
156 )
157 .await;
158
159 let worktree_a = Worktree::open_local(
160 "/a".as_ref(),
161 lang_registry.clone(),
162 fs.clone(),
163 &mut cx_a.to_async(),
164 )
165 .await
166 .unwrap();
167 worktree_a
168 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
169 .await;
170 let (worktree_id, worktree_token) = worktree_a
171 .update(&mut cx_a, |tree, cx| {
172 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
173 })
174 .await
175 .unwrap();
176
177 // Join that worktree as clients B and C.
178 let worktree_b = Worktree::open_remote(
179 client_b.clone(),
180 worktree_id,
181 worktree_token.clone(),
182 lang_registry.clone(),
183 &mut cx_b.to_async(),
184 )
185 .await
186 .unwrap();
187 let worktree_c = Worktree::open_remote(
188 client_c.clone(),
189 worktree_id,
190 worktree_token,
191 lang_registry.clone(),
192 &mut cx_c.to_async(),
193 )
194 .await
195 .unwrap();
196
197 // Open and edit a buffer as both guests B and C.
198 let buffer_b = worktree_b
199 .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
200 .await
201 .unwrap();
202 let buffer_c = worktree_c
203 .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
204 .await
205 .unwrap();
206 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
207 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
208
209 // Open and edit that buffer as the host.
210 let buffer_a = worktree_a
211 .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
212 .await
213 .unwrap();
214
215 buffer_a
216 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
217 .await;
218 buffer_a.update(&mut cx_a, |buf, cx| {
219 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
220 });
221
222 // Wait for edits to propagate
223 buffer_a
224 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
225 .await;
226 buffer_b
227 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
228 .await;
229 buffer_c
230 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
231 .await;
232
233 // Edit the buffer as the host and concurrently save as guest B.
234 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
235 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
236 save_b.await.unwrap();
237 assert_eq!(
238 fs.load("/a/file1".as_ref()).await.unwrap(),
239 "hi-a, i-am-c, i-am-b, i-am-a"
240 );
241 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
242 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
243 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
244
245 // Make changes on host's file system, see those changes on the guests.
246 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
247 .await
248 .unwrap();
249 fs.insert_file(Path::new("/a/file4"), "4".into())
250 .await
251 .unwrap();
252
253 worktree_b
254 .condition(&cx_b, |tree, _| tree.file_count() == 3)
255 .await;
256 worktree_c
257 .condition(&cx_c, |tree, _| tree.file_count() == 3)
258 .await;
259 worktree_b.read_with(&cx_b, |tree, _| {
260 assert_eq!(
261 tree.paths()
262 .map(|p| p.to_string_lossy())
263 .collect::<Vec<_>>(),
264 &["file1", "file3", "file4"]
265 )
266 });
267 worktree_c.read_with(&cx_c, |tree, _| {
268 assert_eq!(
269 tree.paths()
270 .map(|p| p.to_string_lossy())
271 .collect::<Vec<_>>(),
272 &["file1", "file3", "file4"]
273 )
274 });
275}
276
277#[gpui::test]
278async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
279 let lang_registry = Arc::new(LanguageRegistry::new());
280
281 // Connect to a server as 2 clients.
282 let mut server = TestServer::start().await;
283 let client_a = server.create_client(&mut cx_a, "user_a").await;
284 let client_b = server.create_client(&mut cx_b, "user_b").await;
285
286 cx_a.foreground().forbid_parking();
287
288 // Share a local worktree as client A
289 let fs = Arc::new(FakeFs::new());
290 fs.save(Path::new("/a.txt"), &"a-contents".into())
291 .await
292 .unwrap();
293 let worktree_a = Worktree::open_local(
294 "/".as_ref(),
295 lang_registry.clone(),
296 fs,
297 &mut cx_a.to_async(),
298 )
299 .await
300 .unwrap();
301 worktree_a
302 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
303 .await;
304 let (worktree_id, worktree_token) = worktree_a
305 .update(&mut cx_a, |tree, cx| {
306 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
307 })
308 .await
309 .unwrap();
310
311 // Join that worktree as client B, and see that a guest has joined as client A.
312 let worktree_b = Worktree::open_remote(
313 client_b.clone(),
314 worktree_id,
315 worktree_token,
316 lang_registry.clone(),
317 &mut cx_b.to_async(),
318 )
319 .await
320 .unwrap();
321
322 let buffer_b = worktree_b
323 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
324 .await
325 .unwrap();
326 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime);
327
328 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
329 buffer_b.read_with(&cx_b, |buf, _| {
330 assert!(buf.is_dirty());
331 assert!(!buf.has_conflict());
332 });
333
334 buffer_b
335 .update(&mut cx_b, |buf, cx| buf.save(cx))
336 .unwrap()
337 .await
338 .unwrap();
339 worktree_b
340 .condition(&cx_b, |_, cx| {
341 buffer_b.read(cx).file().unwrap().mtime != mtime
342 })
343 .await;
344 buffer_b.read_with(&cx_b, |buf, _| {
345 assert!(!buf.is_dirty());
346 assert!(!buf.has_conflict());
347 });
348
349 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
350 buffer_b.read_with(&cx_b, |buf, _| {
351 assert!(buf.is_dirty());
352 assert!(!buf.has_conflict());
353 });
354}
355
356#[gpui::test]
357async fn test_editing_while_guest_opens_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
358 let lang_registry = Arc::new(LanguageRegistry::new());
359
360 // Connect to a server as 2 clients.
361 let mut server = TestServer::start().await;
362 let client_a = server.create_client(&mut cx_a, "user_a").await;
363 let client_b = server.create_client(&mut cx_b, "user_b").await;
364
365 cx_a.foreground().forbid_parking();
366
367 // Share a local worktree as client A
368 let fs = Arc::new(FakeFs::new());
369 fs.save(Path::new("/a.txt"), &"a-contents".into())
370 .await
371 .unwrap();
372 let worktree_a = Worktree::open_local(
373 "/".as_ref(),
374 lang_registry.clone(),
375 fs,
376 &mut cx_a.to_async(),
377 )
378 .await
379 .unwrap();
380 worktree_a
381 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
382 .await;
383 let (worktree_id, worktree_token) = worktree_a
384 .update(&mut cx_a, |tree, cx| {
385 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
386 })
387 .await
388 .unwrap();
389
390 // Join that worktree as client B, and see that a guest has joined as client A.
391 let worktree_b = Worktree::open_remote(
392 client_b.clone(),
393 worktree_id,
394 worktree_token,
395 lang_registry.clone(),
396 &mut cx_b.to_async(),
397 )
398 .await
399 .unwrap();
400
401 let buffer_a = worktree_a
402 .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
403 .await
404 .unwrap();
405 let buffer_b = cx_b
406 .background()
407 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
408
409 task::yield_now().await;
410 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
411
412 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
413 let buffer_b = buffer_b.await.unwrap();
414 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
415}
416
417#[gpui::test]
418async fn test_peer_disconnection(mut cx_a: TestAppContext, cx_b: TestAppContext) {
419 let lang_registry = Arc::new(LanguageRegistry::new());
420
421 // Connect to a server as 2 clients.
422 let mut server = TestServer::start().await;
423 let client_a = server.create_client(&mut cx_a, "user_a").await;
424 let client_b = server.create_client(&mut cx_a, "user_b").await;
425
426 cx_a.foreground().forbid_parking();
427
428 // Share a local worktree as client A
429 let fs = Arc::new(FakeFs::new());
430 fs.insert_tree(
431 "/a",
432 json!({
433 "a.txt": "a-contents",
434 "b.txt": "b-contents",
435 }),
436 )
437 .await;
438 let worktree_a = Worktree::open_local(
439 "/a".as_ref(),
440 lang_registry.clone(),
441 fs,
442 &mut cx_a.to_async(),
443 )
444 .await
445 .unwrap();
446 worktree_a
447 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
448 .await;
449 let (worktree_id, worktree_token) = worktree_a
450 .update(&mut cx_a, |tree, cx| {
451 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
452 })
453 .await
454 .unwrap();
455
456 // Join that worktree as client B, and see that a guest has joined as client A.
457 let _worktree_b = Worktree::open_remote(
458 client_b.clone(),
459 worktree_id,
460 worktree_token,
461 lang_registry.clone(),
462 &mut cx_b.to_async(),
463 )
464 .await
465 .unwrap();
466 worktree_a
467 .condition(&cx_a, |tree, _| tree.peers().len() == 1)
468 .await;
469
470 // Drop client B's connection and ensure client A observes client B leaving the worktree.
471 client_b.disconnect().await.unwrap();
472 worktree_a
473 .condition(&cx_a, |tree, _| tree.peers().len() == 0)
474 .await;
475}
476
477struct TestServer {
478 peer: Arc<Peer>,
479 app_state: Arc<AppState>,
480 db_name: String,
481 router: Arc<Router>,
482}
483
484impl TestServer {
485 async fn start() -> Self {
486 let mut rng = StdRng::from_entropy();
487 let db_name = format!("zed-test-{}", rng.gen::<u128>());
488 let app_state = Self::build_app_state(&db_name).await;
489 let peer = Peer::new();
490 let mut router = Router::new();
491 add_rpc_routes(&mut router, &app_state, &peer);
492 Self {
493 peer,
494 router: Arc::new(router),
495 app_state,
496 db_name,
497 }
498 }
499
500 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> Client {
501 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
502 let lang_registry = Arc::new(LanguageRegistry::new());
503 let client = Client::new(lang_registry.clone());
504 let mut client_router = ForegroundRouter::new();
505 cx.update(|cx| zed::worktree::init(cx, &client, &mut client_router));
506
507 let (client_conn, server_conn) = Channel::bidirectional();
508 cx.background()
509 .spawn(rpc::handle_connection(
510 self.peer.clone(),
511 self.router.clone(),
512 self.app_state.clone(),
513 name.to_string(),
514 server_conn,
515 user_id,
516 ))
517 .detach();
518 client
519 .add_connection(client_conn, Arc::new(client_router), cx.to_async())
520 .await
521 .unwrap();
522
523 client
524 }
525
526 async fn build_app_state(db_name: &str) -> Arc<AppState> {
527 let mut config = Config::default();
528 config.session_secret = "a".repeat(32);
529 config.database_url = format!("postgres://postgres@localhost/{}", db_name);
530
531 Self::create_db(&config.database_url).await;
532 let db = db::Db(
533 db::DbOptions::new()
534 .max_connections(5)
535 .connect(&config.database_url)
536 .await
537 .expect("failed to connect to postgres database"),
538 );
539 let migrator = Migrator::new(Path::new(concat!(
540 env!("CARGO_MANIFEST_DIR"),
541 "/migrations"
542 )))
543 .await
544 .unwrap();
545 migrator.run(&db.0).await.unwrap();
546
547 let github_client = github::AppClient::test();
548 Arc::new(AppState {
549 db,
550 handlebars: Default::default(),
551 auth_client: auth::build_client("", ""),
552 repo_client: github::RepoClient::test(&github_client),
553 github_client,
554 rpc: Default::default(),
555 config,
556 })
557 }
558
559 async fn create_db(url: &str) {
560 // Enable tests to run in parallel by serializing the creation of each test database.
561 lazy_static::lazy_static! {
562 static ref DB_CREATION: async_std::sync::Mutex<()> = async_std::sync::Mutex::new(());
563 }
564
565 let _lock = DB_CREATION.lock().await;
566 Postgres::create_database(url)
567 .await
568 .expect("failed to create test database");
569 }
570}
571
572impl Drop for TestServer {
573 fn drop(&mut self) {
574 task::block_on(async {
575 self.peer.reset().await;
576 self.app_state
577 .db
578 .execute(
579 format!(
580 "
581 SELECT pg_terminate_backend(pg_stat_activity.pid)
582 FROM pg_stat_activity
583 WHERE pg_stat_activity.datname = '{}' AND pid <> pg_backend_pid();",
584 self.db_name,
585 )
586 .as_str(),
587 )
588 .await
589 .unwrap();
590 self.app_state.db.close().await;
591 Postgres::drop_database(&self.app_state.config.database_url)
592 .await
593 .unwrap();
594 });
595 }
596}
597
598struct EmptyView;
599
600impl gpui::Entity for EmptyView {
601 type Event = ();
602}
603
604impl gpui::View for EmptyView {
605 fn ui_name() -> &'static str {
606 "empty view"
607 }
608
609 fn render<'a>(&self, _: &gpui::RenderContext<Self>) -> gpui::ElementBox {
610 gpui::Element::boxed(gpui::elements::Empty)
611 }
612}