1use crate::{
2 rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT},
3 tests::{TestServer, test_server::open_channel_notes},
4};
5use call::ActiveCall;
6use channel::ACKNOWLEDGE_DEBOUNCE_INTERVAL;
7use client::{Collaborator, ParticipantIndex, UserId};
8use collab_ui::channel_view::ChannelView;
9use collections::HashMap;
10use editor::{Anchor, Editor, MultiBufferOffset, ToOffset};
11use futures::future;
12use gpui::{BackgroundExecutor, Context, Entity, TestAppContext, Window};
13use rpc::{RECEIVE_TIMEOUT, proto::PeerId};
14use serde_json::json;
15use std::ops::Range;
16use util::rel_path::rel_path;
17use workspace::CollaboratorId;
18
19#[gpui::test]
20async fn test_core_channel_buffers(
21 executor: BackgroundExecutor,
22 cx_a: &mut TestAppContext,
23 cx_b: &mut TestAppContext,
24) {
25 let mut server = TestServer::start(executor.clone()).await;
26 let client_a = server.create_client(cx_a, "user_a").await;
27 let client_b = server.create_client(cx_b, "user_b").await;
28
29 let channel_id = server
30 .make_channel("zed", None, (&client_a, cx_a), &mut [(&client_b, cx_b)])
31 .await;
32
33 // Client A joins the channel buffer
34 let channel_buffer_a = client_a
35 .channel_store()
36 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
37 .await
38 .unwrap();
39
40 // Client A edits the buffer
41 let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
42 buffer_a.update(cx_a, |buffer, cx| {
43 buffer.edit([(0..0, "hello world")], None, cx)
44 });
45 buffer_a.update(cx_a, |buffer, cx| {
46 buffer.edit([(5..5, ", cruel")], None, cx)
47 });
48 buffer_a.update(cx_a, |buffer, cx| {
49 buffer.edit([(0..5, "goodbye")], None, cx)
50 });
51 buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
52 assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
53 executor.run_until_parked();
54
55 // Client B joins the channel buffer
56 let channel_buffer_b = client_b
57 .channel_store()
58 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
59 .await
60 .unwrap();
61 channel_buffer_b.read_with(cx_b, |buffer, _| {
62 assert_collaborators(
63 buffer.collaborators(),
64 &[client_a.user_id(), client_b.user_id()],
65 );
66 });
67
68 // Client B sees the correct text, and then edits it
69 let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
70 assert_eq!(
71 buffer_b.read_with(cx_b, |buffer, _| buffer.remote_id()),
72 buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id())
73 );
74 assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
75 buffer_b.update(cx_b, |buffer, cx| {
76 buffer.edit([(7..12, "beautiful")], None, cx)
77 });
78
79 // Both A and B see the new edit
80 executor.run_until_parked();
81 assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
82 assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
83
84 // Client A closes the channel buffer.
85 cx_a.update(|_| drop(channel_buffer_a));
86 executor.run_until_parked();
87
88 // Client B sees that client A is gone from the channel buffer.
89 channel_buffer_b.read_with(cx_b, |buffer, _| {
90 assert_collaborators(buffer.collaborators(), &[client_b.user_id()]);
91 });
92
93 // Client A rejoins the channel buffer
94 let _channel_buffer_a = client_a
95 .channel_store()
96 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
97 .await
98 .unwrap();
99 executor.run_until_parked();
100
101 // Sanity test, make sure we saw A rejoining
102 channel_buffer_b.read_with(cx_b, |buffer, _| {
103 assert_collaborators(
104 buffer.collaborators(),
105 &[client_a.user_id(), client_b.user_id()],
106 );
107 });
108
109 // Client A loses connection.
110 server.forbid_connections();
111 server.disconnect_client(client_a.peer_id().unwrap());
112 executor.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
113
114 // Client B observes A disconnect
115 channel_buffer_b.read_with(cx_b, |buffer, _| {
116 assert_collaborators(buffer.collaborators(), &[client_b.user_id()]);
117 });
118
119 // TODO:
120 // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
121 // - Test interaction with channel deletion while buffer is open
122}
123
124#[gpui::test]
125async fn test_channel_notes_participant_indices(
126 executor: BackgroundExecutor,
127 cx_a: &mut TestAppContext,
128 cx_b: &mut TestAppContext,
129 cx_c: &mut TestAppContext,
130) {
131 let mut server = TestServer::start(executor.clone()).await;
132 let client_a = server.create_client(cx_a, "user_a").await;
133 let client_b = server.create_client(cx_b, "user_b").await;
134 let client_c = server.create_client(cx_c, "user_c").await;
135
136 let active_call_a = cx_a.read(ActiveCall::global);
137 let active_call_b = cx_b.read(ActiveCall::global);
138
139 cx_a.update(editor::init);
140 cx_b.update(editor::init);
141 cx_c.update(editor::init);
142
143 let channel_id = server
144 .make_channel(
145 "the-channel",
146 None,
147 (&client_a, cx_a),
148 &mut [(&client_b, cx_b), (&client_c, cx_c)],
149 )
150 .await;
151
152 client_a
153 .fs()
154 .insert_tree("/root", json!({"file.txt": "123"}))
155 .await;
156 let (project_a, worktree_id_a) = client_a.build_local_project_with_trust("/root", cx_a).await;
157 let project_b = client_b.build_empty_local_project(false, cx_b);
158 let project_c = client_c.build_empty_local_project(false, cx_c);
159
160 let (workspace_a, mut cx_a) = client_a.build_workspace(&project_a, cx_a);
161 let (workspace_b, mut cx_b) = client_b.build_workspace(&project_b, cx_b);
162 let (workspace_c, cx_c) = client_c.build_workspace(&project_c, cx_c);
163
164 // Clients A, B, and C open the channel notes
165 let channel_view_a = cx_a
166 .update(|window, cx| ChannelView::open(channel_id, None, workspace_a.clone(), window, cx))
167 .await
168 .unwrap();
169 let channel_view_b = cx_b
170 .update(|window, cx| ChannelView::open(channel_id, None, workspace_b.clone(), window, cx))
171 .await
172 .unwrap();
173 let channel_view_c = cx_c
174 .update(|window, cx| ChannelView::open(channel_id, None, workspace_c.clone(), window, cx))
175 .await
176 .unwrap();
177
178 // Clients A, B, and C all insert and select some text
179 channel_view_a.update_in(cx_a, |notes, window, cx| {
180 notes.editor.update(cx, |editor, cx| {
181 editor.insert("a", window, cx);
182 editor.change_selections(Default::default(), window, cx, |selections| {
183 selections.select_ranges(vec![MultiBufferOffset(0)..MultiBufferOffset(1)]);
184 });
185 });
186 });
187 executor.run_until_parked();
188 channel_view_b.update_in(cx_b, |notes, window, cx| {
189 notes.editor.update(cx, |editor, cx| {
190 editor.move_down(&Default::default(), window, cx);
191 editor.insert("b", window, cx);
192 editor.change_selections(Default::default(), window, cx, |selections| {
193 selections.select_ranges(vec![MultiBufferOffset(1)..MultiBufferOffset(2)]);
194 });
195 });
196 });
197 executor.run_until_parked();
198 channel_view_c.update_in(cx_c, |notes, window, cx| {
199 notes.editor.update(cx, |editor, cx| {
200 editor.move_down(&Default::default(), window, cx);
201 editor.insert("c", window, cx);
202 editor.change_selections(Default::default(), window, cx, |selections| {
203 selections.select_ranges(vec![MultiBufferOffset(2)..MultiBufferOffset(3)]);
204 });
205 });
206 });
207
208 // Client A sees clients B and C without assigned colors, because they aren't
209 // in a call together.
210 executor.run_until_parked();
211 channel_view_a.update_in(cx_a, |notes, window, cx| {
212 notes.editor.update(cx, |editor, cx| {
213 assert_remote_selections(editor, &[(None, 1..2), (None, 2..3)], window, cx);
214 });
215 });
216
217 // Clients A and B join the same call.
218 for (call, cx) in [(&active_call_a, &mut cx_a), (&active_call_b, &mut cx_b)] {
219 call.update(*cx, |call, cx| call.join_channel(channel_id, cx))
220 .await
221 .unwrap();
222 }
223
224 // Clients A and B see each other with two different assigned colors. Client C
225 // still doesn't have a color.
226 executor.run_until_parked();
227 channel_view_a.update_in(cx_a, |notes, window, cx| {
228 notes.editor.update(cx, |editor, cx| {
229 assert_remote_selections(
230 editor,
231 &[(Some(ParticipantIndex(1)), 1..2), (None, 2..3)],
232 window,
233 cx,
234 );
235 });
236 });
237 channel_view_b.update_in(cx_b, |notes, window, cx| {
238 notes.editor.update(cx, |editor, cx| {
239 assert_remote_selections(
240 editor,
241 &[(Some(ParticipantIndex(0)), 0..1), (None, 2..3)],
242 window,
243 cx,
244 );
245 });
246 });
247
248 // Client A shares a project, and client B joins.
249 let project_id = active_call_a
250 .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
251 .await
252 .unwrap();
253 let project_b = client_b.join_remote_project(project_id, cx_b).await;
254 let (workspace_b, cx_b) = client_b.build_workspace(&project_b, cx_b);
255
256 // Clients A and B open the same file.
257 let editor_a = workspace_a
258 .update_in(cx_a, |workspace, window, cx| {
259 workspace.open_path(
260 (worktree_id_a, rel_path("file.txt")),
261 None,
262 true,
263 window,
264 cx,
265 )
266 })
267 .await
268 .unwrap()
269 .downcast::<Editor>()
270 .unwrap();
271 let editor_b = workspace_b
272 .update_in(cx_b, |workspace, window, cx| {
273 workspace.open_path(
274 (worktree_id_a, rel_path("file.txt")),
275 None,
276 true,
277 window,
278 cx,
279 )
280 })
281 .await
282 .unwrap()
283 .downcast::<Editor>()
284 .unwrap();
285
286 editor_a.update_in(cx_a, |editor, window, cx| {
287 editor.change_selections(Default::default(), window, cx, |selections| {
288 selections.select_ranges(vec![MultiBufferOffset(0)..MultiBufferOffset(1)]);
289 });
290 });
291 editor_b.update_in(cx_b, |editor, window, cx| {
292 editor.change_selections(Default::default(), window, cx, |selections| {
293 selections.select_ranges(vec![MultiBufferOffset(2)..MultiBufferOffset(3)]);
294 });
295 });
296 executor.run_until_parked();
297
298 // Clients A and B see each other with the same colors as in the channel notes.
299 editor_a.update_in(cx_a, |editor, window, cx| {
300 assert_remote_selections(editor, &[(Some(ParticipantIndex(1)), 2..3)], window, cx);
301 });
302 editor_b.update_in(cx_b, |editor, window, cx| {
303 assert_remote_selections(editor, &[(Some(ParticipantIndex(0)), 0..1)], window, cx);
304 });
305}
306
307#[track_caller]
308fn assert_remote_selections(
309 editor: &mut Editor,
310 expected_selections: &[(Option<ParticipantIndex>, Range<usize>)],
311 window: &mut Window,
312 cx: &mut Context<Editor>,
313) {
314 let snapshot = editor.snapshot(window, cx);
315 let hub = editor.collaboration_hub().unwrap();
316 let collaborators = hub.collaborators(cx);
317 let range = Anchor::min()..Anchor::max();
318 let remote_selections = snapshot
319 .remote_selections_in_range(&range, hub, cx)
320 .map(|s| {
321 let CollaboratorId::PeerId(peer_id) = s.collaborator_id else {
322 panic!("unexpected collaborator id");
323 };
324 let start = s.selection.start.to_offset(snapshot.buffer_snapshot());
325 let end = s.selection.end.to_offset(snapshot.buffer_snapshot());
326 let user_id = collaborators.get(&peer_id).unwrap().user_id;
327 let participant_index = hub.user_participant_indices(cx).get(&user_id).copied();
328 (participant_index, start.0..end.0)
329 })
330 .collect::<Vec<_>>();
331 assert_eq!(
332 remote_selections, expected_selections,
333 "incorrect remote selections"
334 );
335}
336
337#[gpui::test]
338async fn test_multiple_handles_to_channel_buffer(
339 deterministic: BackgroundExecutor,
340 cx_a: &mut TestAppContext,
341) {
342 let mut server = TestServer::start(deterministic.clone()).await;
343 let client_a = server.create_client(cx_a, "user_a").await;
344
345 let channel_id = server
346 .make_channel("the-channel", None, (&client_a, cx_a), &mut [])
347 .await;
348
349 let channel_buffer_1 = client_a
350 .channel_store()
351 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
352 let channel_buffer_2 = client_a
353 .channel_store()
354 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
355 let channel_buffer_3 = client_a
356 .channel_store()
357 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
358
359 // All concurrent tasks for opening a channel buffer return the same model handle.
360 let (channel_buffer, channel_buffer_2, channel_buffer_3) =
361 future::try_join3(channel_buffer_1, channel_buffer_2, channel_buffer_3)
362 .await
363 .unwrap();
364 let channel_buffer_entity_id = channel_buffer.entity_id();
365 assert_eq!(channel_buffer, channel_buffer_2);
366 assert_eq!(channel_buffer, channel_buffer_3);
367
368 channel_buffer.update(cx_a, |buffer, cx| {
369 buffer.buffer().update(cx, |buffer, cx| {
370 buffer.edit([(0..0, "hello")], None, cx);
371 })
372 });
373 deterministic.run_until_parked();
374
375 cx_a.update(|_| {
376 drop(channel_buffer);
377 drop(channel_buffer_2);
378 drop(channel_buffer_3);
379 });
380 deterministic.run_until_parked();
381
382 // The channel buffer can be reopened after dropping it.
383 let channel_buffer = client_a
384 .channel_store()
385 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
386 .await
387 .unwrap();
388 assert_ne!(channel_buffer.entity_id(), channel_buffer_entity_id);
389 channel_buffer.update(cx_a, |buffer, cx| {
390 buffer.buffer().update(cx, |buffer, _| {
391 assert_eq!(buffer.text(), "hello");
392 })
393 });
394}
395
396#[gpui::test]
397async fn test_channel_buffer_disconnect(
398 deterministic: BackgroundExecutor,
399 cx_a: &mut TestAppContext,
400 cx_b: &mut TestAppContext,
401) {
402 let mut server = TestServer::start(deterministic.clone()).await;
403 let client_a = server.create_client(cx_a, "user_a").await;
404 let client_b = server.create_client(cx_b, "user_b").await;
405
406 let channel_id = server
407 .make_channel(
408 "the-channel",
409 None,
410 (&client_a, cx_a),
411 &mut [(&client_b, cx_b)],
412 )
413 .await;
414
415 let channel_buffer_a = client_a
416 .channel_store()
417 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
418 .await
419 .unwrap();
420
421 let channel_buffer_b = client_b
422 .channel_store()
423 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
424 .await
425 .unwrap();
426
427 server.forbid_connections();
428 server.disconnect_client(client_a.peer_id().unwrap());
429 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
430
431 channel_buffer_a.update(cx_a, |buffer, cx| {
432 assert_eq!(buffer.channel(cx).unwrap().name, "the-channel");
433 assert!(!buffer.is_connected());
434 });
435
436 deterministic.run_until_parked();
437
438 server.allow_connections();
439 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
440
441 deterministic.run_until_parked();
442
443 client_a
444 .channel_store()
445 .update(cx_a, |channel_store, _| {
446 channel_store.remove_channel(channel_id)
447 })
448 .await
449 .unwrap();
450 deterministic.run_until_parked();
451
452 // Channel buffer observed the deletion
453 channel_buffer_b.update(cx_b, |buffer, cx| {
454 assert!(buffer.channel(cx).is_none());
455 assert!(!buffer.is_connected());
456 });
457}
458
459#[gpui::test]
460async fn test_rejoin_channel_buffer(
461 deterministic: BackgroundExecutor,
462 cx_a: &mut TestAppContext,
463 cx_b: &mut TestAppContext,
464) {
465 let mut server = TestServer::start(deterministic.clone()).await;
466 let client_a = server.create_client(cx_a, "user_a").await;
467 let client_b = server.create_client(cx_b, "user_b").await;
468
469 let channel_id = server
470 .make_channel(
471 "the-channel",
472 None,
473 (&client_a, cx_a),
474 &mut [(&client_b, cx_b)],
475 )
476 .await;
477
478 let channel_buffer_a = client_a
479 .channel_store()
480 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
481 .await
482 .unwrap();
483 let channel_buffer_b = client_b
484 .channel_store()
485 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
486 .await
487 .unwrap();
488
489 channel_buffer_a.update(cx_a, |buffer, cx| {
490 buffer.buffer().update(cx, |buffer, cx| {
491 buffer.edit([(0..0, "1")], None, cx);
492 })
493 });
494 deterministic.run_until_parked();
495
496 // Client A disconnects.
497 server.forbid_connections();
498 server.disconnect_client(client_a.peer_id().unwrap());
499
500 // Both clients make an edit.
501 channel_buffer_a.update(cx_a, |buffer, cx| {
502 buffer.buffer().update(cx, |buffer, cx| {
503 buffer.edit([(1..1, "2")], None, cx);
504 })
505 });
506 channel_buffer_b.update(cx_b, |buffer, cx| {
507 buffer.buffer().update(cx, |buffer, cx| {
508 buffer.edit([(0..0, "0")], None, cx);
509 })
510 });
511
512 // Both clients see their own edit.
513 deterministic.run_until_parked();
514 channel_buffer_a.read_with(cx_a, |buffer, cx| {
515 assert_eq!(buffer.buffer().read(cx).text(), "12");
516 });
517 channel_buffer_b.read_with(cx_b, |buffer, cx| {
518 assert_eq!(buffer.buffer().read(cx).text(), "01");
519 });
520
521 // Client A reconnects. Both clients see each other's edits, and see
522 // the same collaborators.
523 server.allow_connections();
524 deterministic.advance_clock(RECEIVE_TIMEOUT);
525 channel_buffer_a.read_with(cx_a, |buffer, cx| {
526 assert_eq!(buffer.buffer().read(cx).text(), "012");
527 });
528 channel_buffer_b.read_with(cx_b, |buffer, cx| {
529 assert_eq!(buffer.buffer().read(cx).text(), "012");
530 });
531
532 channel_buffer_a.read_with(cx_a, |buffer_a, _| {
533 channel_buffer_b.read_with(cx_b, |buffer_b, _| {
534 assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
535 });
536 });
537}
538
539#[gpui::test]
540async fn test_channel_buffers_and_server_restarts(
541 deterministic: BackgroundExecutor,
542 cx_a: &mut TestAppContext,
543 cx_b: &mut TestAppContext,
544 cx_c: &mut TestAppContext,
545) {
546 let mut server = TestServer::start(deterministic.clone()).await;
547 let client_a = server.create_client(cx_a, "user_a").await;
548 let client_b = server.create_client(cx_b, "user_b").await;
549 let client_c = server.create_client(cx_c, "user_c").await;
550
551 let channel_id = server
552 .make_channel(
553 "the-channel",
554 None,
555 (&client_a, cx_a),
556 &mut [(&client_b, cx_b), (&client_c, cx_c)],
557 )
558 .await;
559
560 let channel_buffer_a = client_a
561 .channel_store()
562 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
563 .await
564 .unwrap();
565 let channel_buffer_b = client_b
566 .channel_store()
567 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
568 .await
569 .unwrap();
570 let _channel_buffer_c = client_c
571 .channel_store()
572 .update(cx_c, |store, cx| store.open_channel_buffer(channel_id, cx))
573 .await
574 .unwrap();
575
576 channel_buffer_a.update(cx_a, |buffer, cx| {
577 buffer.buffer().update(cx, |buffer, cx| {
578 buffer.edit([(0..0, "1")], None, cx);
579 })
580 });
581 deterministic.run_until_parked();
582
583 // Client C can't reconnect.
584 client_c.override_establish_connection(|_, cx| cx.spawn(async |_| future::pending().await));
585
586 // Server stops.
587 server.reset().await;
588 deterministic.advance_clock(RECEIVE_TIMEOUT);
589
590 // While the server is down, both clients make an edit.
591 channel_buffer_a.update(cx_a, |buffer, cx| {
592 buffer.buffer().update(cx, |buffer, cx| {
593 buffer.edit([(1..1, "2")], None, cx);
594 })
595 });
596 channel_buffer_b.update(cx_b, |buffer, cx| {
597 buffer.buffer().update(cx, |buffer, cx| {
598 buffer.edit([(0..0, "0")], None, cx);
599 })
600 });
601
602 // Server restarts.
603 server.start().await.unwrap();
604 deterministic.advance_clock(CLEANUP_TIMEOUT);
605
606 // Clients reconnects. Clients A and B see each other's edits, and see
607 // that client C has disconnected.
608 channel_buffer_a.read_with(cx_a, |buffer, cx| {
609 assert_eq!(buffer.buffer().read(cx).text(), "012");
610 });
611 channel_buffer_b.read_with(cx_b, |buffer, cx| {
612 assert_eq!(buffer.buffer().read(cx).text(), "012");
613 });
614
615 channel_buffer_a.read_with(cx_a, |buffer_a, _| {
616 channel_buffer_b.read_with(cx_b, |buffer_b, _| {
617 assert_collaborators(
618 buffer_a.collaborators(),
619 &[client_a.user_id(), client_b.user_id()],
620 );
621 assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
622 });
623 });
624}
625
626#[gpui::test]
627async fn test_channel_buffer_changes(
628 deterministic: BackgroundExecutor,
629 cx_a: &mut TestAppContext,
630 cx_b: &mut TestAppContext,
631) {
632 let (server, client_a, client_b, channel_id) = TestServer::start2(cx_a, cx_b).await;
633 let (_, cx_a) = client_a.build_test_workspace(cx_a).await;
634 let (workspace_b, cx_b) = client_b.build_test_workspace(cx_b).await;
635 let channel_store_b = client_b.channel_store().clone();
636
637 // Editing the channel notes should set them to dirty
638 open_channel_notes(channel_id, cx_a).await.unwrap();
639 cx_a.simulate_keystrokes("1");
640 channel_store_b.read_with(cx_b, |channel_store, _| {
641 assert!(channel_store.has_channel_buffer_changed(channel_id))
642 });
643
644 // Opening the buffer should clear the changed flag.
645 open_channel_notes(channel_id, cx_b).await.unwrap();
646 channel_store_b.read_with(cx_b, |channel_store, _| {
647 assert!(!channel_store.has_channel_buffer_changed(channel_id))
648 });
649
650 // Editing the channel while the buffer is open should not show that the buffer has changed.
651 cx_a.simulate_keystrokes("2");
652 channel_store_b.read_with(cx_b, |channel_store, _| {
653 assert!(!channel_store.has_channel_buffer_changed(channel_id))
654 });
655
656 // Test that the server is tracking things correctly, and we retain our 'not changed'
657 // state across a disconnect
658 deterministic.advance_clock(ACKNOWLEDGE_DEBOUNCE_INTERVAL);
659 server
660 .simulate_long_connection_interruption(client_b.peer_id().unwrap(), deterministic.clone());
661 channel_store_b.read_with(cx_b, |channel_store, _| {
662 assert!(!channel_store.has_channel_buffer_changed(channel_id))
663 });
664
665 // Closing the buffer should re-enable change tracking
666 cx_b.update(|window, cx| {
667 workspace_b.update(cx, |workspace, cx| {
668 workspace.close_all_items_and_panes(&Default::default(), window, cx)
669 });
670 });
671 deterministic.run_until_parked();
672
673 cx_a.simulate_keystrokes("3");
674 channel_store_b.read_with(cx_b, |channel_store, _| {
675 assert!(channel_store.has_channel_buffer_changed(channel_id))
676 });
677}
678
679#[gpui::test]
680async fn test_channel_buffer_changes_persist(
681 cx_a: &mut TestAppContext,
682 cx_b: &mut TestAppContext,
683 cx_b2: &mut TestAppContext,
684) {
685 let (mut server, client_a, client_b, channel_id) = TestServer::start2(cx_a, cx_b).await;
686 let (_, cx_a) = client_a.build_test_workspace(cx_a).await;
687 let (_, cx_b) = client_b.build_test_workspace(cx_b).await;
688
689 // a) edits the notes
690 open_channel_notes(channel_id, cx_a).await.unwrap();
691 cx_a.simulate_keystrokes("1");
692 // b) opens them to observe the current version
693 open_channel_notes(channel_id, cx_b).await.unwrap();
694
695 // On boot the client should get the correct state.
696 let client_b2 = server.create_client(cx_b2, "user_b").await;
697 let channel_store_b2 = client_b2.channel_store().clone();
698 channel_store_b2.read_with(cx_b2, |channel_store, _| {
699 assert!(!channel_store.has_channel_buffer_changed(channel_id))
700 });
701}
702
703#[track_caller]
704fn assert_collaborators(collaborators: &HashMap<PeerId, Collaborator>, ids: &[Option<UserId>]) {
705 let mut user_ids = collaborators
706 .values()
707 .map(|collaborator| collaborator.user_id)
708 .collect::<Vec<_>>();
709 user_ids.sort();
710 assert_eq!(
711 user_ids,
712 ids.iter().map(|id| id.unwrap()).collect::<Vec<_>>()
713 );
714}
715
716fn buffer_text(channel_buffer: &Entity<language::Buffer>, cx: &mut TestAppContext) -> String {
717 channel_buffer.read_with(cx, |buffer, _| buffer.text())
718}