1use crate::{
2 auth,
3 db::{self, UserId},
4 github, rpc, AppState, Config,
5};
6use async_std::task;
7use gpui::TestAppContext;
8use rand::prelude::*;
9use serde_json::json;
10use sqlx::{
11 migrate::{MigrateDatabase, Migrator},
12 types::time::OffsetDateTime,
13 Executor as _, Postgres,
14};
15use std::{path::Path, sync::Arc};
16use zed::{
17 channel::{ChannelDetails, ChannelList},
18 editor::Editor,
19 fs::{FakeFs, Fs as _},
20 language::LanguageRegistry,
21 rpc::Client,
22 settings,
23 test::Channel,
24 worktree::Worktree,
25};
26use zrpc::Peer;
27
28#[gpui::test]
29async fn test_share_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
30 tide::log::start();
31
32 let (window_b, _) = cx_b.add_window(|_| EmptyView);
33 let settings = settings::channel(&cx_b.font_cache()).unwrap().1;
34 let lang_registry = Arc::new(LanguageRegistry::new());
35
36 // Connect to a server as 2 clients.
37 let mut server = TestServer::start().await;
38 let (_, client_a) = server.create_client(&mut cx_a, "user_a").await;
39 let (_, client_b) = server.create_client(&mut cx_b, "user_b").await;
40
41 cx_a.foreground().forbid_parking();
42
43 // Share a local worktree as client A
44 let fs = Arc::new(FakeFs::new());
45 fs.insert_tree(
46 "/a",
47 json!({
48 "a.txt": "a-contents",
49 "b.txt": "b-contents",
50 }),
51 )
52 .await;
53 let worktree_a = Worktree::open_local(
54 "/a".as_ref(),
55 lang_registry.clone(),
56 fs,
57 &mut cx_a.to_async(),
58 )
59 .await
60 .unwrap();
61 worktree_a
62 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
63 .await;
64 let (worktree_id, worktree_token) = worktree_a
65 .update(&mut cx_a, |tree, cx| {
66 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
67 })
68 .await
69 .unwrap();
70
71 // Join that worktree as client B, and see that a guest has joined as client A.
72 let worktree_b = Worktree::open_remote(
73 client_b.clone(),
74 worktree_id,
75 worktree_token,
76 lang_registry.clone(),
77 &mut cx_b.to_async(),
78 )
79 .await
80 .unwrap();
81 let replica_id_b = worktree_b.read_with(&cx_b, |tree, _| tree.replica_id());
82 worktree_a
83 .condition(&cx_a, |tree, _| {
84 tree.peers()
85 .values()
86 .any(|replica_id| *replica_id == replica_id_b)
87 })
88 .await;
89
90 // Open the same file as client B and client A.
91 let buffer_b = worktree_b
92 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
93 .await
94 .unwrap();
95 buffer_b.read_with(&cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
96 worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
97 let buffer_a = worktree_a
98 .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
99 .await
100 .unwrap();
101
102 // Create a selection set as client B and see that selection set as client A.
103 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, settings, cx));
104 buffer_a
105 .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
106 .await;
107
108 // Edit the buffer as client B and see that edit as client A.
109 editor_b.update(&mut cx_b, |editor, cx| {
110 editor.insert(&"ok, ".to_string(), cx)
111 });
112 buffer_a
113 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
114 .await;
115
116 // Remove the selection set as client B, see those selections disappear as client A.
117 cx_b.update(move |_| drop(editor_b));
118 buffer_a
119 .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
120 .await;
121
122 // Close the buffer as client A, see that the buffer is closed.
123 drop(buffer_a);
124 worktree_a
125 .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
126 .await;
127
128 // Dropping the worktree removes client B from client A's peers.
129 cx_b.update(move |_| drop(worktree_b));
130 worktree_a
131 .condition(&cx_a, |tree, _| tree.peers().is_empty())
132 .await;
133}
134
135#[gpui::test]
136async fn test_propagate_saves_and_fs_changes_in_shared_worktree(
137 mut cx_a: TestAppContext,
138 mut cx_b: TestAppContext,
139 mut cx_c: TestAppContext,
140) {
141 let lang_registry = Arc::new(LanguageRegistry::new());
142
143 // Connect to a server as 3 clients.
144 let mut server = TestServer::start().await;
145 let (_, client_a) = server.create_client(&mut cx_a, "user_a").await;
146 let (_, client_b) = server.create_client(&mut cx_b, "user_b").await;
147 let (_, client_c) = server.create_client(&mut cx_c, "user_c").await;
148
149 cx_a.foreground().forbid_parking();
150
151 let fs = Arc::new(FakeFs::new());
152
153 // Share a worktree as client A.
154 fs.insert_tree(
155 "/a",
156 json!({
157 "file1": "",
158 "file2": ""
159 }),
160 )
161 .await;
162
163 let worktree_a = Worktree::open_local(
164 "/a".as_ref(),
165 lang_registry.clone(),
166 fs.clone(),
167 &mut cx_a.to_async(),
168 )
169 .await
170 .unwrap();
171 worktree_a
172 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
173 .await;
174 let (worktree_id, worktree_token) = worktree_a
175 .update(&mut cx_a, |tree, cx| {
176 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
177 })
178 .await
179 .unwrap();
180
181 // Join that worktree as clients B and C.
182 let worktree_b = Worktree::open_remote(
183 client_b.clone(),
184 worktree_id,
185 worktree_token.clone(),
186 lang_registry.clone(),
187 &mut cx_b.to_async(),
188 )
189 .await
190 .unwrap();
191 let worktree_c = Worktree::open_remote(
192 client_c.clone(),
193 worktree_id,
194 worktree_token,
195 lang_registry.clone(),
196 &mut cx_c.to_async(),
197 )
198 .await
199 .unwrap();
200
201 // Open and edit a buffer as both guests B and C.
202 let buffer_b = worktree_b
203 .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
204 .await
205 .unwrap();
206 let buffer_c = worktree_c
207 .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
208 .await
209 .unwrap();
210 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
211 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
212
213 // Open and edit that buffer as the host.
214 let buffer_a = worktree_a
215 .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
216 .await
217 .unwrap();
218
219 buffer_a
220 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
221 .await;
222 buffer_a.update(&mut cx_a, |buf, cx| {
223 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
224 });
225
226 // Wait for edits to propagate
227 buffer_a
228 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
229 .await;
230 buffer_b
231 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
232 .await;
233 buffer_c
234 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
235 .await;
236
237 // Edit the buffer as the host and concurrently save as guest B.
238 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
239 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
240 save_b.await.unwrap();
241 assert_eq!(
242 fs.load("/a/file1".as_ref()).await.unwrap(),
243 "hi-a, i-am-c, i-am-b, i-am-a"
244 );
245 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
246 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
247 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
248
249 // Make changes on host's file system, see those changes on the guests.
250 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
251 .await
252 .unwrap();
253 fs.insert_file(Path::new("/a/file4"), "4".into())
254 .await
255 .unwrap();
256
257 worktree_b
258 .condition(&cx_b, |tree, _| tree.file_count() == 3)
259 .await;
260 worktree_c
261 .condition(&cx_c, |tree, _| tree.file_count() == 3)
262 .await;
263 worktree_b.read_with(&cx_b, |tree, _| {
264 assert_eq!(
265 tree.paths()
266 .map(|p| p.to_string_lossy())
267 .collect::<Vec<_>>(),
268 &["file1", "file3", "file4"]
269 )
270 });
271 worktree_c.read_with(&cx_c, |tree, _| {
272 assert_eq!(
273 tree.paths()
274 .map(|p| p.to_string_lossy())
275 .collect::<Vec<_>>(),
276 &["file1", "file3", "file4"]
277 )
278 });
279}
280
281#[gpui::test]
282async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
283 let lang_registry = Arc::new(LanguageRegistry::new());
284
285 // Connect to a server as 2 clients.
286 let mut server = TestServer::start().await;
287 let (_, client_a) = server.create_client(&mut cx_a, "user_a").await;
288 let (_, client_b) = server.create_client(&mut cx_b, "user_b").await;
289
290 cx_a.foreground().forbid_parking();
291
292 // Share a local worktree as client A
293 let fs = Arc::new(FakeFs::new());
294 fs.save(Path::new("/a.txt"), &"a-contents".into())
295 .await
296 .unwrap();
297 let worktree_a = Worktree::open_local(
298 "/".as_ref(),
299 lang_registry.clone(),
300 fs,
301 &mut cx_a.to_async(),
302 )
303 .await
304 .unwrap();
305 worktree_a
306 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
307 .await;
308 let (worktree_id, worktree_token) = worktree_a
309 .update(&mut cx_a, |tree, cx| {
310 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
311 })
312 .await
313 .unwrap();
314
315 // Join that worktree as client B, and see that a guest has joined as client A.
316 let worktree_b = Worktree::open_remote(
317 client_b.clone(),
318 worktree_id,
319 worktree_token,
320 lang_registry.clone(),
321 &mut cx_b.to_async(),
322 )
323 .await
324 .unwrap();
325
326 let buffer_b = worktree_b
327 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
328 .await
329 .unwrap();
330 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime);
331
332 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
333 buffer_b.read_with(&cx_b, |buf, _| {
334 assert!(buf.is_dirty());
335 assert!(!buf.has_conflict());
336 });
337
338 buffer_b
339 .update(&mut cx_b, |buf, cx| buf.save(cx))
340 .unwrap()
341 .await
342 .unwrap();
343 worktree_b
344 .condition(&cx_b, |_, cx| {
345 buffer_b.read(cx).file().unwrap().mtime != mtime
346 })
347 .await;
348 buffer_b.read_with(&cx_b, |buf, _| {
349 assert!(!buf.is_dirty());
350 assert!(!buf.has_conflict());
351 });
352
353 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
354 buffer_b.read_with(&cx_b, |buf, _| {
355 assert!(buf.is_dirty());
356 assert!(!buf.has_conflict());
357 });
358}
359
360#[gpui::test]
361async fn test_editing_while_guest_opens_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
362 let lang_registry = Arc::new(LanguageRegistry::new());
363
364 // Connect to a server as 2 clients.
365 let mut server = TestServer::start().await;
366 let (_, client_a) = server.create_client(&mut cx_a, "user_a").await;
367 let (_, client_b) = server.create_client(&mut cx_b, "user_b").await;
368
369 cx_a.foreground().forbid_parking();
370
371 // Share a local worktree as client A
372 let fs = Arc::new(FakeFs::new());
373 fs.save(Path::new("/a.txt"), &"a-contents".into())
374 .await
375 .unwrap();
376 let worktree_a = Worktree::open_local(
377 "/".as_ref(),
378 lang_registry.clone(),
379 fs,
380 &mut cx_a.to_async(),
381 )
382 .await
383 .unwrap();
384 worktree_a
385 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
386 .await;
387 let (worktree_id, worktree_token) = worktree_a
388 .update(&mut cx_a, |tree, cx| {
389 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
390 })
391 .await
392 .unwrap();
393
394 // Join that worktree as client B, and see that a guest has joined as client A.
395 let worktree_b = Worktree::open_remote(
396 client_b.clone(),
397 worktree_id,
398 worktree_token,
399 lang_registry.clone(),
400 &mut cx_b.to_async(),
401 )
402 .await
403 .unwrap();
404
405 let buffer_a = worktree_a
406 .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
407 .await
408 .unwrap();
409 let buffer_b = cx_b
410 .background()
411 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
412
413 task::yield_now().await;
414 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
415
416 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
417 let buffer_b = buffer_b.await.unwrap();
418 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
419}
420
421#[gpui::test]
422async fn test_peer_disconnection(mut cx_a: TestAppContext, cx_b: TestAppContext) {
423 let lang_registry = Arc::new(LanguageRegistry::new());
424
425 // Connect to a server as 2 clients.
426 let mut server = TestServer::start().await;
427 let (_, client_a) = server.create_client(&mut cx_a, "user_a").await;
428 let (_, client_b) = server.create_client(&mut cx_a, "user_b").await;
429
430 cx_a.foreground().forbid_parking();
431
432 // Share a local worktree as client A
433 let fs = Arc::new(FakeFs::new());
434 fs.insert_tree(
435 "/a",
436 json!({
437 "a.txt": "a-contents",
438 "b.txt": "b-contents",
439 }),
440 )
441 .await;
442 let worktree_a = Worktree::open_local(
443 "/a".as_ref(),
444 lang_registry.clone(),
445 fs,
446 &mut cx_a.to_async(),
447 )
448 .await
449 .unwrap();
450 worktree_a
451 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
452 .await;
453 let (worktree_id, worktree_token) = worktree_a
454 .update(&mut cx_a, |tree, cx| {
455 tree.as_local_mut().unwrap().share(client_a.clone(), cx)
456 })
457 .await
458 .unwrap();
459
460 // Join that worktree as client B, and see that a guest has joined as client A.
461 let _worktree_b = Worktree::open_remote(
462 client_b.clone(),
463 worktree_id,
464 worktree_token,
465 lang_registry.clone(),
466 &mut cx_b.to_async(),
467 )
468 .await
469 .unwrap();
470 worktree_a
471 .condition(&cx_a, |tree, _| tree.peers().len() == 1)
472 .await;
473
474 // Drop client B's connection and ensure client A observes client B leaving the worktree.
475 client_b.disconnect().await.unwrap();
476 worktree_a
477 .condition(&cx_a, |tree, _| tree.peers().len() == 0)
478 .await;
479}
480
481#[gpui::test]
482async fn test_basic_chat(mut cx_a: TestAppContext, cx_b: TestAppContext) {
483 // Connect to a server as 2 clients.
484 let mut server = TestServer::start().await;
485 let (user_id_a, client_a) = server.create_client(&mut cx_a, "user_a").await;
486 let (user_id_b, client_b) = server.create_client(&mut cx_a, "user_b").await;
487
488 // Create an org that includes these 2 users.
489 let db = &server.app_state.db;
490 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
491 db.add_org_member(org_id, user_id_a, false).await.unwrap();
492 db.add_org_member(org_id, user_id_b, false).await.unwrap();
493
494 // Create a channel that includes all the users.
495 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
496 db.add_channel_member(channel_id, user_id_a, false)
497 .await
498 .unwrap();
499 db.add_channel_member(channel_id, user_id_b, false)
500 .await
501 .unwrap();
502 db.create_channel_message(
503 channel_id,
504 user_id_b,
505 "hello A, it's B.",
506 OffsetDateTime::now_utc(),
507 )
508 .await
509 .unwrap();
510
511 let channels_a = ChannelList::new(client_a, &mut cx_a.to_async())
512 .await
513 .unwrap();
514 channels_a.read_with(&cx_a, |list, _| {
515 assert_eq!(
516 list.available_channels(),
517 &[ChannelDetails {
518 id: channel_id.to_proto(),
519 name: "test-channel".to_string()
520 }]
521 )
522 });
523
524 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
525 this.get_channel(channel_id.to_proto(), cx).unwrap()
526 });
527 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
528 channel_a.next_notification(&cx_a).await;
529 channel_a.read_with(&cx_a, |channel, _| {
530 assert_eq!(
531 channel
532 .messages()
533 .iter()
534 .map(|m| (m.sender_id, m.body.as_ref()))
535 .collect::<Vec<_>>(),
536 &[(user_id_b.to_proto(), "hello A, it's B.")]
537 );
538 });
539
540 channel_a.update(&mut cx_a, |channel, cx| {
541 channel.send_message("oh, hi B.".to_string(), cx).unwrap();
542 channel.send_message("sup".to_string(), cx).unwrap();
543 assert_eq!(
544 channel
545 .pending_messages()
546 .iter()
547 .map(|m| &m.body)
548 .collect::<Vec<_>>(),
549 &["oh, hi B.", "sup"]
550 )
551 });
552
553 channel_a.next_notification(&cx_a).await;
554}
555
556struct TestServer {
557 peer: Arc<Peer>,
558 app_state: Arc<AppState>,
559 server: Arc<rpc::Server>,
560 db_name: String,
561}
562
563impl TestServer {
564 async fn start() -> Self {
565 let mut rng = StdRng::from_entropy();
566 let db_name = format!("zed-test-{}", rng.gen::<u128>());
567 let app_state = Self::build_app_state(&db_name).await;
568 let peer = Peer::new();
569 let server = rpc::Server::new(app_state.clone(), peer.clone());
570 Self {
571 peer,
572 app_state,
573 server,
574 db_name,
575 }
576 }
577
578 async fn create_client(
579 &mut self,
580 cx: &mut TestAppContext,
581 name: &str,
582 ) -> (UserId, Arc<Client>) {
583 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
584 let client = Client::new();
585 let (client_conn, server_conn) = Channel::bidirectional();
586 cx.background()
587 .spawn(
588 self.server
589 .handle_connection(server_conn, name.to_string(), user_id),
590 )
591 .detach();
592 client
593 .add_connection(user_id.to_proto(), client_conn, cx.to_async())
594 .await
595 .unwrap();
596 (user_id, client)
597 }
598
599 async fn build_app_state(db_name: &str) -> Arc<AppState> {
600 let mut config = Config::default();
601 config.session_secret = "a".repeat(32);
602 config.database_url = format!("postgres://postgres@localhost/{}", db_name);
603
604 Self::create_db(&config.database_url).await;
605 let db = db::Db(
606 db::DbOptions::new()
607 .max_connections(5)
608 .connect(&config.database_url)
609 .await
610 .expect("failed to connect to postgres database"),
611 );
612 let migrator = Migrator::new(Path::new(concat!(
613 env!("CARGO_MANIFEST_DIR"),
614 "/migrations"
615 )))
616 .await
617 .unwrap();
618 migrator.run(&db.0).await.unwrap();
619
620 let github_client = github::AppClient::test();
621 Arc::new(AppState {
622 db,
623 handlebars: Default::default(),
624 auth_client: auth::build_client("", ""),
625 repo_client: github::RepoClient::test(&github_client),
626 github_client,
627 config,
628 })
629 }
630
631 async fn create_db(url: &str) {
632 // Enable tests to run in parallel by serializing the creation of each test database.
633 lazy_static::lazy_static! {
634 static ref DB_CREATION: async_std::sync::Mutex<()> = async_std::sync::Mutex::new(());
635 }
636
637 let _lock = DB_CREATION.lock().await;
638 Postgres::create_database(url)
639 .await
640 .expect("failed to create test database");
641 }
642}
643
644impl Drop for TestServer {
645 fn drop(&mut self) {
646 task::block_on(async {
647 self.peer.reset().await;
648 self.app_state
649 .db
650 .execute(
651 format!(
652 "
653 SELECT pg_terminate_backend(pg_stat_activity.pid)
654 FROM pg_stat_activity
655 WHERE pg_stat_activity.datname = '{}' AND pid <> pg_backend_pid();",
656 self.db_name,
657 )
658 .as_str(),
659 )
660 .await
661 .unwrap();
662 self.app_state.db.close().await;
663 Postgres::drop_database(&self.app_state.config.database_url)
664 .await
665 .unwrap();
666 });
667 }
668}
669
670struct EmptyView;
671
672impl gpui::Entity for EmptyView {
673 type Event = ();
674}
675
676impl gpui::View for EmptyView {
677 fn ui_name() -> &'static str {
678 "empty view"
679 }
680
681 fn render<'a>(&self, _: &gpui::RenderContext<Self>) -> gpui::ElementBox {
682 gpui::Element::boxed(gpui::elements::Empty)
683 }
684}