1use crate::{
2 rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT},
3 tests::TestServer,
4};
5use call::ActiveCall;
6use channel::{Channel, ACKNOWLEDGE_DEBOUNCE_INTERVAL};
7use client::ParticipantIndex;
8use client::{Collaborator, UserId};
9use collab_ui::channel_view::ChannelView;
10use collections::HashMap;
11use editor::{Anchor, Editor, ToOffset};
12use futures::future;
13use gpui::{executor::Deterministic, ModelHandle, TestAppContext, ViewContext};
14use rpc::{proto::PeerId, RECEIVE_TIMEOUT};
15use serde_json::json;
16use std::{ops::Range, sync::Arc};
17
18#[gpui::test]
19async fn test_core_channel_buffers(
20 deterministic: Arc<Deterministic>,
21 cx_a: &mut TestAppContext,
22 cx_b: &mut TestAppContext,
23) {
24 deterministic.forbid_parking();
25 let mut server = TestServer::start(&deterministic).await;
26 let client_a = server.create_client(cx_a, "user_a").await;
27 let client_b = server.create_client(cx_b, "user_b").await;
28
29 let channel_id = server
30 .make_channel("zed", None, (&client_a, cx_a), &mut [(&client_b, cx_b)])
31 .await;
32
33 // Client A joins the channel buffer
34 let channel_buffer_a = client_a
35 .channel_store()
36 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
37 .await
38 .unwrap();
39
40 // Client A edits the buffer
41 let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
42 buffer_a.update(cx_a, |buffer, cx| {
43 buffer.edit([(0..0, "hello world")], None, cx)
44 });
45 buffer_a.update(cx_a, |buffer, cx| {
46 buffer.edit([(5..5, ", cruel")], None, cx)
47 });
48 buffer_a.update(cx_a, |buffer, cx| {
49 buffer.edit([(0..5, "goodbye")], None, cx)
50 });
51 buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
52 assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
53 deterministic.run_until_parked();
54
55 // Client B joins the channel buffer
56 let channel_buffer_b = client_b
57 .channel_store()
58 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
59 .await
60 .unwrap();
61 channel_buffer_b.read_with(cx_b, |buffer, _| {
62 assert_collaborators(
63 buffer.collaborators(),
64 &[client_a.user_id(), client_b.user_id()],
65 );
66 });
67
68 // Client B sees the correct text, and then edits it
69 let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
70 assert_eq!(
71 buffer_b.read_with(cx_b, |buffer, _| buffer.remote_id()),
72 buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id())
73 );
74 assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
75 buffer_b.update(cx_b, |buffer, cx| {
76 buffer.edit([(7..12, "beautiful")], None, cx)
77 });
78
79 // Both A and B see the new edit
80 deterministic.run_until_parked();
81 assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
82 assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
83
84 // Client A closes the channel buffer.
85 cx_a.update(|_| drop(channel_buffer_a));
86 deterministic.run_until_parked();
87
88 // Client B sees that client A is gone from the channel buffer.
89 channel_buffer_b.read_with(cx_b, |buffer, _| {
90 assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
91 });
92
93 // Client A rejoins the channel buffer
94 let _channel_buffer_a = client_a
95 .channel_store()
96 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
97 .await
98 .unwrap();
99 deterministic.run_until_parked();
100
101 // Sanity test, make sure we saw A rejoining
102 channel_buffer_b.read_with(cx_b, |buffer, _| {
103 assert_collaborators(
104 &buffer.collaborators(),
105 &[client_a.user_id(), client_b.user_id()],
106 );
107 });
108
109 // Client A loses connection.
110 server.forbid_connections();
111 server.disconnect_client(client_a.peer_id().unwrap());
112 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
113
114 // Client B observes A disconnect
115 channel_buffer_b.read_with(cx_b, |buffer, _| {
116 assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
117 });
118
119 // TODO:
120 // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
121 // - Test interaction with channel deletion while buffer is open
122}
123
124#[gpui::test]
125async fn test_channel_notes_participant_indices(
126 deterministic: Arc<Deterministic>,
127 mut cx_a: &mut TestAppContext,
128 mut cx_b: &mut TestAppContext,
129 cx_c: &mut TestAppContext,
130) {
131 deterministic.forbid_parking();
132 let mut server = TestServer::start(&deterministic).await;
133 let client_a = server.create_client(cx_a, "user_a").await;
134 let client_b = server.create_client(cx_b, "user_b").await;
135 let client_c = server.create_client(cx_c, "user_c").await;
136
137 let active_call_a = cx_a.read(ActiveCall::global);
138 let active_call_b = cx_b.read(ActiveCall::global);
139
140 cx_a.update(editor::init);
141 cx_b.update(editor::init);
142 cx_c.update(editor::init);
143
144 let channel_id = server
145 .make_channel(
146 "the-channel",
147 None,
148 (&client_a, cx_a),
149 &mut [(&client_b, cx_b), (&client_c, cx_c)],
150 )
151 .await;
152
153 client_a
154 .fs()
155 .insert_tree("/root", json!({"file.txt": "123"}))
156 .await;
157 let (project_a, worktree_id_a) = client_a.build_local_project("/root", cx_a).await;
158 let project_b = client_b.build_empty_local_project(cx_b);
159 let project_c = client_c.build_empty_local_project(cx_c);
160 let workspace_a = client_a.build_workspace(&project_a, cx_a).root(cx_a);
161 let workspace_b = client_b.build_workspace(&project_b, cx_b).root(cx_b);
162 let workspace_c = client_c.build_workspace(&project_c, cx_c).root(cx_c);
163
164 // Clients A, B, and C open the channel notes
165 let channel_view_a = cx_a
166 .update(|cx| ChannelView::open(channel_id, workspace_a.clone(), cx))
167 .await
168 .unwrap();
169 let channel_view_b = cx_b
170 .update(|cx| ChannelView::open(channel_id, workspace_b.clone(), cx))
171 .await
172 .unwrap();
173 let channel_view_c = cx_c
174 .update(|cx| ChannelView::open(channel_id, workspace_c.clone(), cx))
175 .await
176 .unwrap();
177
178 // Clients A, B, and C all insert and select some text
179 channel_view_a.update(cx_a, |notes, cx| {
180 notes.editor.update(cx, |editor, cx| {
181 editor.insert("a", cx);
182 editor.change_selections(None, cx, |selections| {
183 selections.select_ranges(vec![0..1]);
184 });
185 });
186 });
187 deterministic.run_until_parked();
188 channel_view_b.update(cx_b, |notes, cx| {
189 notes.editor.update(cx, |editor, cx| {
190 editor.move_down(&Default::default(), cx);
191 editor.insert("b", cx);
192 editor.change_selections(None, cx, |selections| {
193 selections.select_ranges(vec![1..2]);
194 });
195 });
196 });
197 deterministic.run_until_parked();
198 channel_view_c.update(cx_c, |notes, cx| {
199 notes.editor.update(cx, |editor, cx| {
200 editor.move_down(&Default::default(), cx);
201 editor.insert("c", cx);
202 editor.change_selections(None, cx, |selections| {
203 selections.select_ranges(vec![2..3]);
204 });
205 });
206 });
207
208 // Client A sees clients B and C without assigned colors, because they aren't
209 // in a call together.
210 deterministic.run_until_parked();
211 channel_view_a.update(cx_a, |notes, cx| {
212 notes.editor.update(cx, |editor, cx| {
213 assert_remote_selections(editor, &[(None, 1..2), (None, 2..3)], cx);
214 });
215 });
216
217 // Clients A and B join the same call.
218 for (call, cx) in [(&active_call_a, &mut cx_a), (&active_call_b, &mut cx_b)] {
219 call.update(*cx, |call, cx| call.join_channel(channel_id, cx))
220 .await
221 .unwrap();
222 }
223
224 // Clients A and B see each other with two different assigned colors. Client C
225 // still doesn't have a color.
226 deterministic.run_until_parked();
227 channel_view_a.update(cx_a, |notes, cx| {
228 notes.editor.update(cx, |editor, cx| {
229 assert_remote_selections(
230 editor,
231 &[(Some(ParticipantIndex(1)), 1..2), (None, 2..3)],
232 cx,
233 );
234 });
235 });
236 channel_view_b.update(cx_b, |notes, cx| {
237 notes.editor.update(cx, |editor, cx| {
238 assert_remote_selections(
239 editor,
240 &[(Some(ParticipantIndex(0)), 0..1), (None, 2..3)],
241 cx,
242 );
243 });
244 });
245
246 // Client A shares a project, and client B joins.
247 let project_id = active_call_a
248 .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
249 .await
250 .unwrap();
251 let project_b = client_b.build_remote_project(project_id, cx_b).await;
252 let workspace_b = client_b.build_workspace(&project_b, cx_b).root(cx_b);
253
254 // Clients A and B open the same file.
255 let editor_a = workspace_a
256 .update(cx_a, |workspace, cx| {
257 workspace.open_path((worktree_id_a, "file.txt"), None, true, cx)
258 })
259 .await
260 .unwrap()
261 .downcast::<Editor>()
262 .unwrap();
263 let editor_b = workspace_b
264 .update(cx_b, |workspace, cx| {
265 workspace.open_path((worktree_id_a, "file.txt"), None, true, cx)
266 })
267 .await
268 .unwrap()
269 .downcast::<Editor>()
270 .unwrap();
271
272 editor_a.update(cx_a, |editor, cx| {
273 editor.change_selections(None, cx, |selections| {
274 selections.select_ranges(vec![0..1]);
275 });
276 });
277 editor_b.update(cx_b, |editor, cx| {
278 editor.change_selections(None, cx, |selections| {
279 selections.select_ranges(vec![2..3]);
280 });
281 });
282 deterministic.run_until_parked();
283
284 // Clients A and B see each other with the same colors as in the channel notes.
285 editor_a.update(cx_a, |editor, cx| {
286 assert_remote_selections(editor, &[(Some(ParticipantIndex(1)), 2..3)], cx);
287 });
288 editor_b.update(cx_b, |editor, cx| {
289 assert_remote_selections(editor, &[(Some(ParticipantIndex(0)), 0..1)], cx);
290 });
291}
292
293#[track_caller]
294fn assert_remote_selections(
295 editor: &mut Editor,
296 expected_selections: &[(Option<ParticipantIndex>, Range<usize>)],
297 cx: &mut ViewContext<Editor>,
298) {
299 let snapshot = editor.snapshot(cx);
300 let range = Anchor::min()..Anchor::max();
301 let remote_selections = snapshot
302 .remote_selections_in_range(&range, editor.collaboration_hub().unwrap(), cx)
303 .map(|s| {
304 let start = s.selection.start.to_offset(&snapshot.buffer_snapshot);
305 let end = s.selection.end.to_offset(&snapshot.buffer_snapshot);
306 (s.participant_index, start..end)
307 })
308 .collect::<Vec<_>>();
309 assert_eq!(
310 remote_selections, expected_selections,
311 "incorrect remote selections"
312 );
313}
314
315#[gpui::test]
316async fn test_multiple_handles_to_channel_buffer(
317 deterministic: Arc<Deterministic>,
318 cx_a: &mut TestAppContext,
319) {
320 deterministic.forbid_parking();
321 let mut server = TestServer::start(&deterministic).await;
322 let client_a = server.create_client(cx_a, "user_a").await;
323
324 let channel_id = server
325 .make_channel("the-channel", None, (&client_a, cx_a), &mut [])
326 .await;
327
328 let channel_buffer_1 = client_a
329 .channel_store()
330 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
331 let channel_buffer_2 = client_a
332 .channel_store()
333 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
334 let channel_buffer_3 = client_a
335 .channel_store()
336 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
337
338 // All concurrent tasks for opening a channel buffer return the same model handle.
339 let (channel_buffer, channel_buffer_2, channel_buffer_3) =
340 future::try_join3(channel_buffer_1, channel_buffer_2, channel_buffer_3)
341 .await
342 .unwrap();
343 let channel_buffer_model_id = channel_buffer.id();
344 assert_eq!(channel_buffer, channel_buffer_2);
345 assert_eq!(channel_buffer, channel_buffer_3);
346
347 channel_buffer.update(cx_a, |buffer, cx| {
348 buffer.buffer().update(cx, |buffer, cx| {
349 buffer.edit([(0..0, "hello")], None, cx);
350 })
351 });
352 deterministic.run_until_parked();
353
354 cx_a.update(|_| {
355 drop(channel_buffer);
356 drop(channel_buffer_2);
357 drop(channel_buffer_3);
358 });
359 deterministic.run_until_parked();
360
361 // The channel buffer can be reopened after dropping it.
362 let channel_buffer = client_a
363 .channel_store()
364 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
365 .await
366 .unwrap();
367 assert_ne!(channel_buffer.id(), channel_buffer_model_id);
368 channel_buffer.update(cx_a, |buffer, cx| {
369 buffer.buffer().update(cx, |buffer, _| {
370 assert_eq!(buffer.text(), "hello");
371 })
372 });
373}
374
375#[gpui::test]
376async fn test_channel_buffer_disconnect(
377 deterministic: Arc<Deterministic>,
378 cx_a: &mut TestAppContext,
379 cx_b: &mut TestAppContext,
380) {
381 deterministic.forbid_parking();
382 let mut server = TestServer::start(&deterministic).await;
383 let client_a = server.create_client(cx_a, "user_a").await;
384 let client_b = server.create_client(cx_b, "user_b").await;
385
386 let channel_id = server
387 .make_channel(
388 "the-channel",
389 None,
390 (&client_a, cx_a),
391 &mut [(&client_b, cx_b)],
392 )
393 .await;
394
395 let channel_buffer_a = client_a
396 .channel_store()
397 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
398 .await
399 .unwrap();
400 let channel_buffer_b = client_b
401 .channel_store()
402 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
403 .await
404 .unwrap();
405
406 server.forbid_connections();
407 server.disconnect_client(client_a.peer_id().unwrap());
408 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
409
410 channel_buffer_a.update(cx_a, |buffer, _| {
411 assert_eq!(
412 buffer.channel().as_ref(),
413 &channel(channel_id, "the-channel")
414 );
415 assert!(!buffer.is_connected());
416 });
417
418 deterministic.run_until_parked();
419
420 server.allow_connections();
421 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
422
423 deterministic.run_until_parked();
424
425 client_a
426 .channel_store()
427 .update(cx_a, |channel_store, _| {
428 channel_store.remove_channel(channel_id)
429 })
430 .await
431 .unwrap();
432 deterministic.run_until_parked();
433
434 // Channel buffer observed the deletion
435 channel_buffer_b.update(cx_b, |buffer, _| {
436 assert_eq!(
437 buffer.channel().as_ref(),
438 &channel(channel_id, "the-channel")
439 );
440 assert!(!buffer.is_connected());
441 });
442}
443
444fn channel(id: u64, name: &'static str) -> Channel {
445 Channel {
446 id,
447 name: name.to_string(),
448 unseen_note_version: None,
449 unseen_message_id: None,
450 }
451}
452
453#[gpui::test]
454async fn test_rejoin_channel_buffer(
455 deterministic: Arc<Deterministic>,
456 cx_a: &mut TestAppContext,
457 cx_b: &mut TestAppContext,
458) {
459 deterministic.forbid_parking();
460 let mut server = TestServer::start(&deterministic).await;
461 let client_a = server.create_client(cx_a, "user_a").await;
462 let client_b = server.create_client(cx_b, "user_b").await;
463
464 let channel_id = server
465 .make_channel(
466 "the-channel",
467 None,
468 (&client_a, cx_a),
469 &mut [(&client_b, cx_b)],
470 )
471 .await;
472
473 let channel_buffer_a = client_a
474 .channel_store()
475 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
476 .await
477 .unwrap();
478 let channel_buffer_b = client_b
479 .channel_store()
480 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
481 .await
482 .unwrap();
483
484 channel_buffer_a.update(cx_a, |buffer, cx| {
485 buffer.buffer().update(cx, |buffer, cx| {
486 buffer.edit([(0..0, "1")], None, cx);
487 })
488 });
489 deterministic.run_until_parked();
490
491 // Client A disconnects.
492 server.forbid_connections();
493 server.disconnect_client(client_a.peer_id().unwrap());
494
495 // Both clients make an edit.
496 channel_buffer_a.update(cx_a, |buffer, cx| {
497 buffer.buffer().update(cx, |buffer, cx| {
498 buffer.edit([(1..1, "2")], None, cx);
499 })
500 });
501 channel_buffer_b.update(cx_b, |buffer, cx| {
502 buffer.buffer().update(cx, |buffer, cx| {
503 buffer.edit([(0..0, "0")], None, cx);
504 })
505 });
506
507 // Both clients see their own edit.
508 deterministic.run_until_parked();
509 channel_buffer_a.read_with(cx_a, |buffer, cx| {
510 assert_eq!(buffer.buffer().read(cx).text(), "12");
511 });
512 channel_buffer_b.read_with(cx_b, |buffer, cx| {
513 assert_eq!(buffer.buffer().read(cx).text(), "01");
514 });
515
516 // Client A reconnects. Both clients see each other's edits, and see
517 // the same collaborators.
518 server.allow_connections();
519 deterministic.advance_clock(RECEIVE_TIMEOUT);
520 channel_buffer_a.read_with(cx_a, |buffer, cx| {
521 assert_eq!(buffer.buffer().read(cx).text(), "012");
522 });
523 channel_buffer_b.read_with(cx_b, |buffer, cx| {
524 assert_eq!(buffer.buffer().read(cx).text(), "012");
525 });
526
527 channel_buffer_a.read_with(cx_a, |buffer_a, _| {
528 channel_buffer_b.read_with(cx_b, |buffer_b, _| {
529 assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
530 });
531 });
532}
533
534#[gpui::test]
535async fn test_channel_buffers_and_server_restarts(
536 deterministic: Arc<Deterministic>,
537 cx_a: &mut TestAppContext,
538 cx_b: &mut TestAppContext,
539 cx_c: &mut TestAppContext,
540) {
541 deterministic.forbid_parking();
542 let mut server = TestServer::start(&deterministic).await;
543 let client_a = server.create_client(cx_a, "user_a").await;
544 let client_b = server.create_client(cx_b, "user_b").await;
545 let client_c = server.create_client(cx_c, "user_c").await;
546
547 let channel_id = server
548 .make_channel(
549 "the-channel",
550 None,
551 (&client_a, cx_a),
552 &mut [(&client_b, cx_b), (&client_c, cx_c)],
553 )
554 .await;
555
556 let channel_buffer_a = client_a
557 .channel_store()
558 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
559 .await
560 .unwrap();
561 let channel_buffer_b = client_b
562 .channel_store()
563 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
564 .await
565 .unwrap();
566 let _channel_buffer_c = client_c
567 .channel_store()
568 .update(cx_c, |store, cx| store.open_channel_buffer(channel_id, cx))
569 .await
570 .unwrap();
571
572 channel_buffer_a.update(cx_a, |buffer, cx| {
573 buffer.buffer().update(cx, |buffer, cx| {
574 buffer.edit([(0..0, "1")], None, cx);
575 })
576 });
577 deterministic.run_until_parked();
578
579 // Client C can't reconnect.
580 client_c.override_establish_connection(|_, cx| cx.spawn(|_| future::pending()));
581
582 // Server stops.
583 server.reset().await;
584 deterministic.advance_clock(RECEIVE_TIMEOUT);
585
586 // While the server is down, both clients make an edit.
587 channel_buffer_a.update(cx_a, |buffer, cx| {
588 buffer.buffer().update(cx, |buffer, cx| {
589 buffer.edit([(1..1, "2")], None, cx);
590 })
591 });
592 channel_buffer_b.update(cx_b, |buffer, cx| {
593 buffer.buffer().update(cx, |buffer, cx| {
594 buffer.edit([(0..0, "0")], None, cx);
595 })
596 });
597
598 // Server restarts.
599 server.start().await.unwrap();
600 deterministic.advance_clock(CLEANUP_TIMEOUT);
601
602 // Clients reconnects. Clients A and B see each other's edits, and see
603 // that client C has disconnected.
604 channel_buffer_a.read_with(cx_a, |buffer, cx| {
605 assert_eq!(buffer.buffer().read(cx).text(), "012");
606 });
607 channel_buffer_b.read_with(cx_b, |buffer, cx| {
608 assert_eq!(buffer.buffer().read(cx).text(), "012");
609 });
610
611 channel_buffer_a.read_with(cx_a, |buffer_a, _| {
612 channel_buffer_b.read_with(cx_b, |buffer_b, _| {
613 assert_collaborators(
614 buffer_a.collaborators(),
615 &[client_a.user_id(), client_b.user_id()],
616 );
617 assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
618 });
619 });
620}
621
622#[gpui::test(iterations = 10)]
623async fn test_following_to_channel_notes_without_a_shared_project(
624 deterministic: Arc<Deterministic>,
625 mut cx_a: &mut TestAppContext,
626 mut cx_b: &mut TestAppContext,
627 mut cx_c: &mut TestAppContext,
628) {
629 deterministic.forbid_parking();
630 let mut server = TestServer::start(&deterministic).await;
631 let client_a = server.create_client(cx_a, "user_a").await;
632 let client_b = server.create_client(cx_b, "user_b").await;
633
634 let client_c = server.create_client(cx_c, "user_c").await;
635
636 cx_a.update(editor::init);
637 cx_b.update(editor::init);
638 cx_c.update(editor::init);
639 cx_a.update(collab_ui::channel_view::init);
640 cx_b.update(collab_ui::channel_view::init);
641 cx_c.update(collab_ui::channel_view::init);
642
643 let channel_1_id = server
644 .make_channel(
645 "channel-1",
646 None,
647 (&client_a, cx_a),
648 &mut [(&client_b, cx_b), (&client_c, cx_c)],
649 )
650 .await;
651 let channel_2_id = server
652 .make_channel(
653 "channel-2",
654 None,
655 (&client_a, cx_a),
656 &mut [(&client_b, cx_b), (&client_c, cx_c)],
657 )
658 .await;
659
660 // Clients A, B, and C join a channel.
661 let active_call_a = cx_a.read(ActiveCall::global);
662 let active_call_b = cx_b.read(ActiveCall::global);
663 let active_call_c = cx_c.read(ActiveCall::global);
664 for (call, cx) in [
665 (&active_call_a, &mut cx_a),
666 (&active_call_b, &mut cx_b),
667 (&active_call_c, &mut cx_c),
668 ] {
669 call.update(*cx, |call, cx| call.join_channel(channel_1_id, cx))
670 .await
671 .unwrap();
672 }
673 deterministic.run_until_parked();
674
675 // Clients A, B, and C all open their own unshared projects.
676 client_a.fs().insert_tree("/a", json!({})).await;
677 client_b.fs().insert_tree("/b", json!({})).await;
678 client_c.fs().insert_tree("/c", json!({})).await;
679 let (project_a, _) = client_a.build_local_project("/a", cx_a).await;
680 let (project_b, _) = client_b.build_local_project("/b", cx_b).await;
681 let (project_c, _) = client_b.build_local_project("/c", cx_c).await;
682 let workspace_a = client_a.build_workspace(&project_a, cx_a).root(cx_a);
683 let workspace_b = client_b.build_workspace(&project_b, cx_b).root(cx_b);
684 let _workspace_c = client_c.build_workspace(&project_c, cx_c).root(cx_c);
685
686 active_call_a
687 .update(cx_a, |call, cx| call.set_location(Some(&project_a), cx))
688 .await
689 .unwrap();
690
691 // Client A opens the notes for channel 1.
692 let channel_view_1_a = cx_a
693 .update(|cx| ChannelView::open(channel_1_id, workspace_a.clone(), cx))
694 .await
695 .unwrap();
696 channel_view_1_a.update(cx_a, |notes, cx| {
697 assert_eq!(notes.channel(cx).name, "channel-1");
698 notes.editor.update(cx, |editor, cx| {
699 editor.insert("Hello from A.", cx);
700 editor.change_selections(None, cx, |selections| {
701 selections.select_ranges(vec![3..4]);
702 });
703 });
704 });
705
706 // Client B follows client A.
707 workspace_b
708 .update(cx_b, |workspace, cx| {
709 workspace.follow(client_a.peer_id().unwrap(), cx).unwrap()
710 })
711 .await
712 .unwrap();
713
714 // Client B is taken to the notes for channel 1, with the same
715 // text selected as client A.
716 deterministic.run_until_parked();
717 let channel_view_1_b = workspace_b.read_with(cx_b, |workspace, cx| {
718 assert_eq!(
719 workspace.leader_for_pane(workspace.active_pane()),
720 Some(client_a.peer_id().unwrap())
721 );
722 workspace
723 .active_item(cx)
724 .expect("no active item")
725 .downcast::<ChannelView>()
726 .expect("active item is not a channel view")
727 });
728 channel_view_1_b.read_with(cx_b, |notes, cx| {
729 assert_eq!(notes.channel(cx).name, "channel-1");
730 let editor = notes.editor.read(cx);
731 assert_eq!(editor.text(cx), "Hello from A.");
732 assert_eq!(editor.selections.ranges::<usize>(cx), &[3..4]);
733 });
734
735 // Client A opens the notes for channel 2.
736 let channel_view_2_a = cx_a
737 .update(|cx| ChannelView::open(channel_2_id, workspace_a.clone(), cx))
738 .await
739 .unwrap();
740 channel_view_2_a.read_with(cx_a, |notes, cx| {
741 assert_eq!(notes.channel(cx).name, "channel-2");
742 });
743
744 // Client B is taken to the notes for channel 2.
745 deterministic.run_until_parked();
746 let channel_view_2_b = workspace_b.read_with(cx_b, |workspace, cx| {
747 assert_eq!(
748 workspace.leader_for_pane(workspace.active_pane()),
749 Some(client_a.peer_id().unwrap())
750 );
751 workspace
752 .active_item(cx)
753 .expect("no active item")
754 .downcast::<ChannelView>()
755 .expect("active item is not a channel view")
756 });
757 channel_view_2_b.read_with(cx_b, |notes, cx| {
758 assert_eq!(notes.channel(cx).name, "channel-2");
759 });
760}
761
762#[gpui::test]
763async fn test_channel_buffer_changes(
764 deterministic: Arc<Deterministic>,
765 cx_a: &mut TestAppContext,
766 cx_b: &mut TestAppContext,
767) {
768 deterministic.forbid_parking();
769 let mut server = TestServer::start(&deterministic).await;
770 let client_a = server.create_client(cx_a, "user_a").await;
771 let client_b = server.create_client(cx_b, "user_b").await;
772
773 let channel_id = server
774 .make_channel(
775 "the-channel",
776 None,
777 (&client_a, cx_a),
778 &mut [(&client_b, cx_b)],
779 )
780 .await;
781
782 let channel_buffer_a = client_a
783 .channel_store()
784 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
785 .await
786 .unwrap();
787
788 // Client A makes an edit, and client B should see that the note has changed.
789 channel_buffer_a.update(cx_a, |buffer, cx| {
790 buffer.buffer().update(cx, |buffer, cx| {
791 buffer.edit([(0..0, "1")], None, cx);
792 })
793 });
794 deterministic.run_until_parked();
795
796 let has_buffer_changed = cx_b.read(|cx| {
797 client_b
798 .channel_store()
799 .read(cx)
800 .has_channel_buffer_changed(channel_id)
801 .unwrap()
802 });
803 assert!(has_buffer_changed);
804
805 // Opening the buffer should clear the changed flag.
806 let project_b = client_b.build_empty_local_project(cx_b);
807 let workspace_b = client_b.build_workspace(&project_b, cx_b).root(cx_b);
808 let channel_view_b = cx_b
809 .update(|cx| ChannelView::open(channel_id, workspace_b.clone(), cx))
810 .await
811 .unwrap();
812 deterministic.run_until_parked();
813
814 let has_buffer_changed = cx_b.read(|cx| {
815 client_b
816 .channel_store()
817 .read(cx)
818 .has_channel_buffer_changed(channel_id)
819 .unwrap()
820 });
821 assert!(!has_buffer_changed);
822
823 // Editing the channel while the buffer is open should not show that the buffer has changed.
824 channel_buffer_a.update(cx_a, |buffer, cx| {
825 buffer.buffer().update(cx, |buffer, cx| {
826 buffer.edit([(0..0, "2")], None, cx);
827 })
828 });
829 deterministic.run_until_parked();
830
831 let has_buffer_changed = cx_b.read(|cx| {
832 client_b
833 .channel_store()
834 .read(cx)
835 .has_channel_buffer_changed(channel_id)
836 .unwrap()
837 });
838 assert!(!has_buffer_changed);
839
840 deterministic.advance_clock(ACKNOWLEDGE_DEBOUNCE_INTERVAL);
841
842 // Test that the server is tracking things correctly, and we retain our 'not changed'
843 // state across a disconnect
844 server.simulate_long_connection_interruption(client_b.peer_id().unwrap(), &deterministic);
845 let has_buffer_changed = cx_b.read(|cx| {
846 client_b
847 .channel_store()
848 .read(cx)
849 .has_channel_buffer_changed(channel_id)
850 .unwrap()
851 });
852 assert!(!has_buffer_changed);
853
854 // Closing the buffer should re-enable change tracking
855 cx_b.update(|cx| {
856 workspace_b.update(cx, |workspace, cx| {
857 workspace.close_all_items_and_panes(&Default::default(), cx)
858 });
859
860 drop(channel_view_b)
861 });
862
863 deterministic.run_until_parked();
864
865 channel_buffer_a.update(cx_a, |buffer, cx| {
866 buffer.buffer().update(cx, |buffer, cx| {
867 buffer.edit([(0..0, "3")], None, cx);
868 })
869 });
870 deterministic.run_until_parked();
871
872 let has_buffer_changed = cx_b.read(|cx| {
873 client_b
874 .channel_store()
875 .read(cx)
876 .has_channel_buffer_changed(channel_id)
877 .unwrap()
878 });
879 assert!(has_buffer_changed);
880}
881
882#[track_caller]
883fn assert_collaborators(collaborators: &HashMap<PeerId, Collaborator>, ids: &[Option<UserId>]) {
884 let mut user_ids = collaborators
885 .values()
886 .map(|collaborator| collaborator.user_id)
887 .collect::<Vec<_>>();
888 user_ids.sort();
889 assert_eq!(
890 user_ids,
891 ids.into_iter().map(|id| id.unwrap()).collect::<Vec<_>>()
892 );
893}
894
895fn buffer_text(channel_buffer: &ModelHandle<language::Buffer>, cx: &mut TestAppContext) -> String {
896 channel_buffer.read_with(cx, |buffer, _| buffer.text())
897}