1use crate::{
2 admin, auth, github,
3 rpc::{self, add_rpc_routes},
4 AppState, Config,
5};
6use async_std::task;
7use gpui::TestAppContext;
8use rand::prelude::*;
9use serde_json::json;
10use sqlx::{
11 migrate::{MigrateDatabase, Migrator},
12 postgres::PgPoolOptions,
13 Executor as _, Postgres,
14};
15use std::{path::Path, sync::Arc};
16use zed::{
17 editor::Editor,
18 language::LanguageRegistry,
19 rpc::Client,
20 settings,
21 test::Channel,
22 worktree::{FakeFs, Fs as _, Worktree},
23};
24use zrpc::{ForegroundRouter, Peer, Router};
25
26#[gpui::test]
27async fn test_share_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
28 let (window_b, _) = cx_b.add_window(|_| EmptyView);
29 let settings = settings::channel(&cx_b.font_cache()).unwrap().1;
30 let lang_registry = Arc::new(LanguageRegistry::new());
31
32 // Connect to a server as 2 clients.
33 let mut server = TestServer::start().await;
34 let client_a = server.create_client(&mut cx_a, "user_a").await;
35 let client_b = server.create_client(&mut cx_b, "user_b").await;
36
37 // Share a local worktree as client A
38 let fs = Arc::new(FakeFs::new());
39 fs.insert_tree(
40 "/a",
41 json!({
42 "a.txt": "a-contents",
43 "b.txt": "b-contents",
44 }),
45 )
46 .await;
47 let worktree_a = Worktree::open_local(
48 "/a".as_ref(),
49 lang_registry.clone(),
50 fs,
51 &mut cx_a.to_async(),
52 )
53 .await
54 .unwrap();
55 worktree_a
56 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
57 .await;
58 let (worktree_id, worktree_token) = worktree_a
59 .update(&mut cx_a, |tree, cx| {
60 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
61 })
62 .await
63 .unwrap();
64
65 // Join that worktree as client B, and see that a guest has joined as client A.
66 let worktree_b = Worktree::open_remote(
67 client_b.clone(),
68 worktree_id,
69 worktree_token,
70 lang_registry.clone(),
71 &mut cx_b.to_async(),
72 )
73 .await
74 .unwrap();
75 let replica_id_b = worktree_b.read_with(&cx_b, |tree, _| tree.replica_id());
76 worktree_a
77 .condition(&cx_a, |tree, _| {
78 tree.peers()
79 .values()
80 .any(|replica_id| *replica_id == replica_id_b)
81 })
82 .await;
83
84 // Open the same file as client B and client A.
85 let buffer_b = worktree_b
86 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
87 .await
88 .unwrap();
89 buffer_b.read_with(&cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
90 worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
91 let buffer_a = worktree_a
92 .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
93 .await
94 .unwrap();
95
96 // Create a selection set as client B and see that selection set as client A.
97 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, settings, cx));
98 buffer_a
99 .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
100 .await;
101
102 // Edit the buffer as client B and see that edit as client A.
103 editor_b.update(&mut cx_b, |editor, cx| {
104 editor.insert(&"ok, ".to_string(), cx)
105 });
106 buffer_a
107 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
108 .await;
109
110 // Remove the selection set as client B, see those selections disappear as client A.
111 cx_b.update(move |_| drop(editor_b));
112 buffer_a
113 .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
114 .await;
115
116 // Close the buffer as client A, see that the buffer is closed.
117 drop(buffer_a);
118 worktree_a
119 .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
120 .await;
121
122 // Dropping the worktree removes client B from client A's peers.
123 cx_b.update(move |_| drop(worktree_b));
124 worktree_a
125 .condition(&cx_a, |tree, _| tree.peers().is_empty())
126 .await;
127}
128
129#[gpui::test]
130async fn test_propagate_saves_and_fs_changes_in_shared_worktree(
131 mut cx_a: TestAppContext,
132 mut cx_b: TestAppContext,
133 mut cx_c: TestAppContext,
134) {
135 let lang_registry = Arc::new(LanguageRegistry::new());
136
137 // Connect to a server as 3 clients.
138 let mut server = TestServer::start().await;
139 let client_a = server.create_client(&mut cx_a, "user_a").await;
140 let client_b = server.create_client(&mut cx_b, "user_b").await;
141 let client_c = server.create_client(&mut cx_c, "user_c").await;
142
143 let fs = Arc::new(FakeFs::new());
144
145 // Share a worktree as client A.
146 fs.insert_tree(
147 "/a",
148 json!({
149 "file1": "",
150 "file2": ""
151 }),
152 )
153 .await;
154
155 let worktree_a = Worktree::open_local(
156 "/a".as_ref(),
157 lang_registry.clone(),
158 fs.clone(),
159 &mut cx_a.to_async(),
160 )
161 .await
162 .unwrap();
163 worktree_a
164 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
165 .await;
166 let (worktree_id, worktree_token) = worktree_a
167 .update(&mut cx_a, |tree, cx| {
168 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
169 })
170 .await
171 .unwrap();
172
173 // Join that worktree as clients B and C.
174 let worktree_b = Worktree::open_remote(
175 client_b.clone(),
176 worktree_id,
177 worktree_token.clone(),
178 lang_registry.clone(),
179 &mut cx_b.to_async(),
180 )
181 .await
182 .unwrap();
183 let worktree_c = Worktree::open_remote(
184 client_c.clone(),
185 worktree_id,
186 worktree_token,
187 lang_registry.clone(),
188 &mut cx_c.to_async(),
189 )
190 .await
191 .unwrap();
192
193 // Open and edit a buffer as both guests B and C.
194 let buffer_b = worktree_b
195 .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
196 .await
197 .unwrap();
198 let buffer_c = worktree_c
199 .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
200 .await
201 .unwrap();
202 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
203 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
204
205 // Open and edit that buffer as the host.
206 let buffer_a = worktree_a
207 .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
208 .await
209 .unwrap();
210
211 buffer_a
212 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
213 .await;
214 buffer_a.update(&mut cx_a, |buf, cx| {
215 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
216 });
217
218 // Wait for edits to propagate
219 buffer_a
220 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
221 .await;
222 buffer_b
223 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
224 .await;
225 buffer_c
226 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
227 .await;
228
229 // Edit the buffer as the host and concurrently save as guest B.
230 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
231 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
232 save_b.await.unwrap();
233 assert_eq!(
234 fs.load("/a/file1".as_ref()).await.unwrap(),
235 "hi-a, i-am-c, i-am-b, i-am-a"
236 );
237 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
238 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
239 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
240
241 // Make changes on host's file system, see those changes on the guests.
242 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
243 .await
244 .unwrap();
245 fs.insert_file(Path::new("/a/file4"), "4".into())
246 .await
247 .unwrap();
248
249 worktree_b
250 .condition(&cx_b, |tree, _| tree.file_count() == 3)
251 .await;
252 worktree_c
253 .condition(&cx_c, |tree, _| tree.file_count() == 3)
254 .await;
255 worktree_b.read_with(&cx_b, |tree, _| {
256 assert_eq!(
257 tree.paths()
258 .map(|p| p.to_string_lossy())
259 .collect::<Vec<_>>(),
260 &["file1", "file3", "file4"]
261 )
262 });
263 worktree_c.read_with(&cx_c, |tree, _| {
264 assert_eq!(
265 tree.paths()
266 .map(|p| p.to_string_lossy())
267 .collect::<Vec<_>>(),
268 &["file1", "file3", "file4"]
269 )
270 });
271}
272
273#[gpui::test]
274async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
275 let lang_registry = Arc::new(LanguageRegistry::new());
276
277 // Connect to a server as 2 clients.
278 let mut server = TestServer::start().await;
279 let client_a = server.create_client(&mut cx_a, "user_a").await;
280 let client_b = server.create_client(&mut cx_b, "user_b").await;
281
282 // Share a local worktree as client A
283 let fs = Arc::new(FakeFs::new());
284 fs.save(Path::new("/a.txt"), &"a-contents".into())
285 .await
286 .unwrap();
287 let worktree_a = Worktree::open_local(
288 "/".as_ref(),
289 lang_registry.clone(),
290 fs,
291 &mut cx_a.to_async(),
292 )
293 .await
294 .unwrap();
295 worktree_a
296 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
297 .await;
298 let (worktree_id, worktree_token) = worktree_a
299 .update(&mut cx_a, |tree, cx| {
300 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
301 })
302 .await
303 .unwrap();
304
305 // Join that worktree as client B, and see that a guest has joined as client A.
306 let worktree_b = Worktree::open_remote(
307 client_b.clone(),
308 worktree_id,
309 worktree_token,
310 lang_registry.clone(),
311 &mut cx_b.to_async(),
312 )
313 .await
314 .unwrap();
315
316 let buffer_b = worktree_b
317 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
318 .await
319 .unwrap();
320 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime);
321
322 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
323 buffer_b.read_with(&cx_b, |buf, _| {
324 assert!(buf.is_dirty());
325 assert!(!buf.has_conflict());
326 });
327
328 buffer_b
329 .update(&mut cx_b, |buf, cx| buf.save(cx))
330 .unwrap()
331 .await
332 .unwrap();
333 worktree_b
334 .condition(&cx_b, |_, cx| {
335 buffer_b.read(cx).file().unwrap().mtime != mtime
336 })
337 .await;
338 buffer_b.read_with(&cx_b, |buf, _| {
339 assert!(!buf.is_dirty());
340 assert!(!buf.has_conflict());
341 });
342
343 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
344 buffer_b.read_with(&cx_b, |buf, _| {
345 assert!(buf.is_dirty());
346 assert!(!buf.has_conflict());
347 });
348}
349
350#[gpui::test]
351async fn test_editing_while_guest_opens_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
352 let lang_registry = Arc::new(LanguageRegistry::new());
353
354 // Connect to a server as 2 clients.
355 let mut server = TestServer::start().await;
356 let client_a = server.create_client(&mut cx_a, "user_a").await;
357 let client_b = server.create_client(&mut cx_b, "user_b").await;
358
359 // Share a local worktree as client A
360 let fs = Arc::new(FakeFs::new());
361 fs.save(Path::new("/a.txt"), &"a-contents".into())
362 .await
363 .unwrap();
364 let worktree_a = Worktree::open_local(
365 "/".as_ref(),
366 lang_registry.clone(),
367 fs,
368 &mut cx_a.to_async(),
369 )
370 .await
371 .unwrap();
372 worktree_a
373 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
374 .await;
375 let (worktree_id, worktree_token) = worktree_a
376 .update(&mut cx_a, |tree, cx| {
377 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
378 })
379 .await
380 .unwrap();
381
382 // Join that worktree as client B, and see that a guest has joined as client A.
383 let worktree_b = Worktree::open_remote(
384 client_b.clone(),
385 worktree_id,
386 worktree_token,
387 lang_registry.clone(),
388 &mut cx_b.to_async(),
389 )
390 .await
391 .unwrap();
392
393 let buffer_a = worktree_a
394 .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
395 .await
396 .unwrap();
397 let buffer_b = cx_b
398 .background()
399 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
400
401 task::yield_now().await;
402 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
403
404 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
405 let buffer_b = buffer_b.await.unwrap();
406 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
407}
408
409#[gpui::test]
410async fn test_peer_disconnection(mut cx_a: TestAppContext, cx_b: TestAppContext) {
411 let lang_registry = Arc::new(LanguageRegistry::new());
412
413 // Connect to a server as 2 clients.
414 let mut server = TestServer::start().await;
415 let client_a = server.create_client(&mut cx_a, "user_a").await;
416 let client_b = server.create_client(&mut cx_a, "user_b").await;
417
418 // Share a local worktree as client A
419 let fs = Arc::new(FakeFs::new());
420 fs.insert_tree(
421 "/a",
422 json!({
423 "a.txt": "a-contents",
424 "b.txt": "b-contents",
425 }),
426 )
427 .await;
428 let worktree_a = Worktree::open_local(
429 "/a".as_ref(),
430 lang_registry.clone(),
431 fs,
432 &mut cx_a.to_async(),
433 )
434 .await
435 .unwrap();
436 worktree_a
437 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
438 .await;
439 let (worktree_id, worktree_token) = worktree_a
440 .update(&mut cx_a, |tree, cx| {
441 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
442 })
443 .await
444 .unwrap();
445
446 // Join that worktree as client B, and see that a guest has joined as client A.
447 let _worktree_b = Worktree::open_remote(
448 client_b.clone(),
449 worktree_id,
450 worktree_token,
451 lang_registry.clone(),
452 &mut cx_b.to_async(),
453 )
454 .await
455 .unwrap();
456 worktree_a
457 .condition(&cx_a, |tree, _| tree.peers().len() == 1)
458 .await;
459
460 // Drop client B's connection and ensure client A observes client B leaving the worktree.
461 client_b.disconnect().await.unwrap();
462 worktree_a
463 .condition(&cx_a, |tree, _| tree.peers().len() == 0)
464 .await;
465}
466
467struct TestServer {
468 peer: Arc<Peer>,
469 app_state: Arc<AppState>,
470 db_name: String,
471 router: Arc<Router>,
472}
473
474impl TestServer {
475 async fn start() -> Self {
476 let mut rng = StdRng::from_entropy();
477 let db_name = format!("zed-test-{}", rng.gen::<u128>());
478 let app_state = Self::build_app_state(&db_name).await;
479 let peer = Peer::new();
480 let mut router = Router::new();
481 add_rpc_routes(&mut router, &app_state, &peer);
482 Self {
483 peer,
484 router: Arc::new(router),
485 app_state,
486 db_name,
487 }
488 }
489
490 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> Client {
491 let user_id = admin::create_user(&self.app_state.db, name, false)
492 .await
493 .unwrap();
494 let lang_registry = Arc::new(LanguageRegistry::new());
495 let client = Client::new(lang_registry.clone());
496 let mut client_router = ForegroundRouter::new();
497 cx.update(|cx| zed::worktree::init(cx, &client, &mut client_router));
498
499 let (client_conn, server_conn) = Channel::bidirectional();
500 cx.background()
501 .spawn(rpc::handle_connection(
502 self.peer.clone(),
503 self.router.clone(),
504 self.app_state.clone(),
505 name.to_string(),
506 server_conn,
507 user_id,
508 ))
509 .detach();
510 client
511 .add_connection(client_conn, Arc::new(client_router), cx.to_async())
512 .await
513 .unwrap();
514
515 // Reset the executor because running SQL queries has a non-deterministic impact on it.
516 cx.foreground().reset();
517 client
518 }
519
520 async fn build_app_state(db_name: &str) -> Arc<AppState> {
521 let mut config = Config::default();
522 config.session_secret = "a".repeat(32);
523 config.database_url = format!("postgres://postgres@localhost/{}", db_name);
524
525 Self::create_db(&config.database_url).await;
526 let db = PgPoolOptions::new()
527 .max_connections(5)
528 .connect(&config.database_url)
529 .await
530 .expect("failed to connect to postgres database");
531 let migrator = Migrator::new(Path::new("./migrations")).await.unwrap();
532 migrator.run(&db).await.unwrap();
533
534 let github_client = github::AppClient::test();
535 Arc::new(AppState {
536 db,
537 handlebars: Default::default(),
538 auth_client: auth::build_client("", ""),
539 repo_client: github::RepoClient::test(&github_client),
540 github_client,
541 rpc: Default::default(),
542 config,
543 })
544 }
545
546 async fn create_db(url: &str) {
547 // Enable tests to run in parallel by serializing the creation of each test database.
548 lazy_static::lazy_static! {
549 static ref DB_CREATION: async_std::sync::Mutex<()> = async_std::sync::Mutex::new(());
550 }
551
552 let _lock = DB_CREATION.lock().await;
553 Postgres::create_database(url)
554 .await
555 .expect("failed to create test database");
556 }
557}
558
559impl Drop for TestServer {
560 fn drop(&mut self) {
561 task::block_on(async {
562 self.peer.reset().await;
563 self.app_state
564 .db
565 .execute(
566 format!(
567 "
568 SELECT pg_terminate_backend(pg_stat_activity.pid)
569 FROM pg_stat_activity
570 WHERE pg_stat_activity.datname = '{}' AND pid <> pg_backend_pid();",
571 self.db_name,
572 )
573 .as_str(),
574 )
575 .await
576 .unwrap();
577 self.app_state.db.close().await;
578 Postgres::drop_database(&self.app_state.config.database_url)
579 .await
580 .unwrap();
581 });
582 }
583}
584
585struct EmptyView;
586
587impl gpui::Entity for EmptyView {
588 type Event = ();
589}
590
591impl gpui::View for EmptyView {
592 fn ui_name() -> &'static str {
593 "empty view"
594 }
595
596 fn render<'a>(&self, _: &gpui::AppContext) -> gpui::ElementBox {
597 gpui::Element::boxed(gpui::elements::Empty)
598 }
599}