channel_buffer_tests.rs

  1use crate::{
  2    rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT},
  3    tests::{TestServer, test_server::open_channel_notes},
  4};
  5use call::ActiveCall;
  6use channel::ACKNOWLEDGE_DEBOUNCE_INTERVAL;
  7use client::{Collaborator, ParticipantIndex, UserId};
  8use collab_ui::channel_view::ChannelView;
  9use collections::HashMap;
 10use editor::{Anchor, Editor, MultiBufferOffset, ToOffset};
 11use futures::future;
 12use gpui::{BackgroundExecutor, Context, Entity, TestAppContext, Window};
 13use rpc::{RECEIVE_TIMEOUT, proto::PeerId};
 14use serde_json::json;
 15use std::ops::Range;
 16use util::rel_path::rel_path;
 17use workspace::CollaboratorId;
 18
 19#[gpui::test]
 20async fn test_core_channel_buffers(
 21    executor: BackgroundExecutor,
 22    cx_a: &mut TestAppContext,
 23    cx_b: &mut TestAppContext,
 24) {
 25    let mut server = TestServer::start(executor.clone()).await;
 26    let client_a = server.create_client(cx_a, "user_a").await;
 27    let client_b = server.create_client(cx_b, "user_b").await;
 28
 29    let channel_id = server
 30        .make_channel("zed", None, (&client_a, cx_a), &mut [(&client_b, cx_b)])
 31        .await;
 32
 33    // Client A joins the channel buffer
 34    let channel_buffer_a = client_a
 35        .channel_store()
 36        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
 37        .await
 38        .unwrap();
 39
 40    // Client A edits the buffer
 41    let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
 42    buffer_a.update(cx_a, |buffer, cx| {
 43        buffer.edit([(0..0, "hello world")], None, cx)
 44    });
 45    buffer_a.update(cx_a, |buffer, cx| {
 46        buffer.edit([(5..5, ", cruel")], None, cx)
 47    });
 48    buffer_a.update(cx_a, |buffer, cx| {
 49        buffer.edit([(0..5, "goodbye")], None, cx)
 50    });
 51    buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
 52    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
 53    executor.run_until_parked();
 54
 55    // Client B joins the channel buffer
 56    let channel_buffer_b = client_b
 57        .channel_store()
 58        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
 59        .await
 60        .unwrap();
 61    channel_buffer_b.read_with(cx_b, |buffer, _| {
 62        assert_collaborators(
 63            buffer.collaborators(),
 64            &[client_a.user_id(), client_b.user_id()],
 65        );
 66    });
 67
 68    // Client B sees the correct text, and then edits it
 69    let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
 70    assert_eq!(
 71        buffer_b.read_with(cx_b, |buffer, _| buffer.remote_id()),
 72        buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id())
 73    );
 74    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
 75    buffer_b.update(cx_b, |buffer, cx| {
 76        buffer.edit([(7..12, "beautiful")], None, cx)
 77    });
 78
 79    // Both A and B see the new edit
 80    executor.run_until_parked();
 81    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
 82    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
 83
 84    // Client A closes the channel buffer.
 85    cx_a.update(|_| drop(channel_buffer_a));
 86    executor.run_until_parked();
 87
 88    // Client B sees that client A is gone from the channel buffer.
 89    channel_buffer_b.read_with(cx_b, |buffer, _| {
 90        assert_collaborators(buffer.collaborators(), &[client_b.user_id()]);
 91    });
 92
 93    // Client A rejoins the channel buffer
 94    let _channel_buffer_a = client_a
 95        .channel_store()
 96        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
 97        .await
 98        .unwrap();
 99    executor.run_until_parked();
100
101    // Sanity test, make sure we saw A rejoining
102    channel_buffer_b.read_with(cx_b, |buffer, _| {
103        assert_collaborators(
104            buffer.collaborators(),
105            &[client_a.user_id(), client_b.user_id()],
106        );
107    });
108
109    // Client A loses connection.
110    server.forbid_connections();
111    server.disconnect_client(client_a.peer_id().unwrap());
112    executor.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
113
114    // Client B observes A disconnect
115    channel_buffer_b.read_with(cx_b, |buffer, _| {
116        assert_collaborators(buffer.collaborators(), &[client_b.user_id()]);
117    });
118
119    // TODO:
120    // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
121    // - Test interaction with channel deletion while buffer is open
122}
123
124#[gpui::test]
125async fn test_channel_notes_participant_indices(
126    executor: BackgroundExecutor,
127    cx_a: &mut TestAppContext,
128    cx_b: &mut TestAppContext,
129    cx_c: &mut TestAppContext,
130) {
131    let mut server = TestServer::start(executor.clone()).await;
132    let client_a = server.create_client(cx_a, "user_a").await;
133    let client_b = server.create_client(cx_b, "user_b").await;
134    let client_c = server.create_client(cx_c, "user_c").await;
135
136    let active_call_a = cx_a.read(ActiveCall::global);
137    let active_call_b = cx_b.read(ActiveCall::global);
138
139    cx_a.update(editor::init);
140    cx_b.update(editor::init);
141    cx_c.update(editor::init);
142
143    let channel_id = server
144        .make_channel(
145            "the-channel",
146            None,
147            (&client_a, cx_a),
148            &mut [(&client_b, cx_b), (&client_c, cx_c)],
149        )
150        .await;
151
152    client_a
153        .fs()
154        .insert_tree("/root", json!({"file.txt": "123"}))
155        .await;
156    let (project_a, worktree_id_a) = client_a.build_local_project_with_trust("/root", cx_a).await;
157    let project_b = client_b.build_empty_local_project(false, cx_b);
158    let project_c = client_c.build_empty_local_project(false, cx_c);
159
160    let (workspace_a, mut cx_a) = client_a.build_workspace(&project_a, cx_a);
161    let (workspace_b, mut cx_b) = client_b.build_workspace(&project_b, cx_b);
162    let (workspace_c, cx_c) = client_c.build_workspace(&project_c, cx_c);
163
164    // Clients A, B, and C open the channel notes
165    let channel_view_a = cx_a
166        .update(|window, cx| ChannelView::open(channel_id, None, workspace_a.clone(), window, cx))
167        .await
168        .unwrap();
169    let channel_view_b = cx_b
170        .update(|window, cx| ChannelView::open(channel_id, None, workspace_b.clone(), window, cx))
171        .await
172        .unwrap();
173    let channel_view_c = cx_c
174        .update(|window, cx| ChannelView::open(channel_id, None, workspace_c.clone(), window, cx))
175        .await
176        .unwrap();
177
178    // Clients A, B, and C all insert and select some text
179    channel_view_a.update_in(cx_a, |notes, window, cx| {
180        notes.editor.update(cx, |editor, cx| {
181            editor.insert("a", window, cx);
182            editor.change_selections(Default::default(), window, cx, |selections| {
183                selections.select_ranges(vec![MultiBufferOffset(0)..MultiBufferOffset(1)]);
184            });
185        });
186    });
187    executor.run_until_parked();
188    channel_view_b.update_in(cx_b, |notes, window, cx| {
189        notes.editor.update(cx, |editor, cx| {
190            editor.move_down(&Default::default(), window, cx);
191            editor.insert("b", window, cx);
192            editor.change_selections(Default::default(), window, cx, |selections| {
193                selections.select_ranges(vec![MultiBufferOffset(1)..MultiBufferOffset(2)]);
194            });
195        });
196    });
197    executor.run_until_parked();
198    channel_view_c.update_in(cx_c, |notes, window, cx| {
199        notes.editor.update(cx, |editor, cx| {
200            editor.move_down(&Default::default(), window, cx);
201            editor.insert("c", window, cx);
202            editor.change_selections(Default::default(), window, cx, |selections| {
203                selections.select_ranges(vec![MultiBufferOffset(2)..MultiBufferOffset(3)]);
204            });
205        });
206    });
207
208    // Client A sees clients B and C without assigned colors, because they aren't
209    // in a call together.
210    executor.run_until_parked();
211    channel_view_a.update_in(cx_a, |notes, window, cx| {
212        notes.editor.update(cx, |editor, cx| {
213            assert_remote_selections(editor, &[(None, 1..2), (None, 2..3)], window, cx);
214        });
215    });
216
217    // Clients A and B join the same call.
218    for (call, cx) in [(&active_call_a, &mut cx_a), (&active_call_b, &mut cx_b)] {
219        call.update(*cx, |call, cx| call.join_channel(channel_id, cx))
220            .await
221            .unwrap();
222    }
223
224    // Clients A and B see each other with two different assigned colors. Client C
225    // still doesn't have a color.
226    executor.run_until_parked();
227    channel_view_a.update_in(cx_a, |notes, window, cx| {
228        notes.editor.update(cx, |editor, cx| {
229            assert_remote_selections(
230                editor,
231                &[(Some(ParticipantIndex(1)), 1..2), (None, 2..3)],
232                window,
233                cx,
234            );
235        });
236    });
237    channel_view_b.update_in(cx_b, |notes, window, cx| {
238        notes.editor.update(cx, |editor, cx| {
239            assert_remote_selections(
240                editor,
241                &[(Some(ParticipantIndex(0)), 0..1), (None, 2..3)],
242                window,
243                cx,
244            );
245        });
246    });
247
248    // Client A shares a project, and client B joins.
249    let project_id = active_call_a
250        .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
251        .await
252        .unwrap();
253    let project_b = client_b.join_remote_project(project_id, cx_b).await;
254    let (workspace_b, cx_b) = client_b.build_workspace(&project_b, cx_b);
255
256    // Clients A and B open the same file.
257    let editor_a = workspace_a
258        .update_in(cx_a, |workspace, window, cx| {
259            workspace.open_path(
260                (worktree_id_a, rel_path("file.txt")),
261                None,
262                true,
263                window,
264                cx,
265            )
266        })
267        .await
268        .unwrap()
269        .downcast::<Editor>()
270        .unwrap();
271    let editor_b = workspace_b
272        .update_in(cx_b, |workspace, window, cx| {
273            workspace.open_path(
274                (worktree_id_a, rel_path("file.txt")),
275                None,
276                true,
277                window,
278                cx,
279            )
280        })
281        .await
282        .unwrap()
283        .downcast::<Editor>()
284        .unwrap();
285
286    editor_a.update_in(cx_a, |editor, window, cx| {
287        editor.change_selections(Default::default(), window, cx, |selections| {
288            selections.select_ranges(vec![MultiBufferOffset(0)..MultiBufferOffset(1)]);
289        });
290    });
291    editor_b.update_in(cx_b, |editor, window, cx| {
292        editor.change_selections(Default::default(), window, cx, |selections| {
293            selections.select_ranges(vec![MultiBufferOffset(2)..MultiBufferOffset(3)]);
294        });
295    });
296    executor.run_until_parked();
297
298    // Clients A and B see each other with the same colors as in the channel notes.
299    editor_a.update_in(cx_a, |editor, window, cx| {
300        assert_remote_selections(editor, &[(Some(ParticipantIndex(1)), 2..3)], window, cx);
301    });
302    editor_b.update_in(cx_b, |editor, window, cx| {
303        assert_remote_selections(editor, &[(Some(ParticipantIndex(0)), 0..1)], window, cx);
304    });
305}
306
307#[track_caller]
308fn assert_remote_selections(
309    editor: &mut Editor,
310    expected_selections: &[(Option<ParticipantIndex>, Range<usize>)],
311    window: &mut Window,
312    cx: &mut Context<Editor>,
313) {
314    let snapshot = editor.snapshot(window, cx);
315    let hub = editor.collaboration_hub().unwrap();
316    let collaborators = hub.collaborators(cx);
317    let range = Anchor::min()..Anchor::max();
318    let remote_selections = snapshot
319        .remote_selections_in_range(&range, hub, cx)
320        .map(|s| {
321            let CollaboratorId::PeerId(peer_id) = s.collaborator_id else {
322                panic!("unexpected collaborator id");
323            };
324            let start = s.selection.start.to_offset(snapshot.buffer_snapshot());
325            let end = s.selection.end.to_offset(snapshot.buffer_snapshot());
326            let user_id = collaborators.get(&peer_id).unwrap().user_id;
327            let participant_index = hub.user_participant_indices(cx).get(&user_id).copied();
328            (participant_index, start.0..end.0)
329        })
330        .collect::<Vec<_>>();
331    assert_eq!(
332        remote_selections, expected_selections,
333        "incorrect remote selections"
334    );
335}
336
337#[gpui::test]
338async fn test_multiple_handles_to_channel_buffer(
339    deterministic: BackgroundExecutor,
340    cx_a: &mut TestAppContext,
341) {
342    let mut server = TestServer::start(deterministic.clone()).await;
343    let client_a = server.create_client(cx_a, "user_a").await;
344
345    let channel_id = server
346        .make_channel("the-channel", None, (&client_a, cx_a), &mut [])
347        .await;
348
349    let channel_buffer_1 = client_a
350        .channel_store()
351        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
352    let channel_buffer_2 = client_a
353        .channel_store()
354        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
355    let channel_buffer_3 = client_a
356        .channel_store()
357        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
358
359    // All concurrent tasks for opening a channel buffer return the same model handle.
360    let (channel_buffer, channel_buffer_2, channel_buffer_3) =
361        future::try_join3(channel_buffer_1, channel_buffer_2, channel_buffer_3)
362            .await
363            .unwrap();
364    let channel_buffer_entity_id = channel_buffer.entity_id();
365    assert_eq!(channel_buffer, channel_buffer_2);
366    assert_eq!(channel_buffer, channel_buffer_3);
367
368    channel_buffer.update(cx_a, |buffer, cx| {
369        buffer.buffer().update(cx, |buffer, cx| {
370            buffer.edit([(0..0, "hello")], None, cx);
371        })
372    });
373    deterministic.run_until_parked();
374
375    cx_a.update(|_| {
376        drop(channel_buffer);
377        drop(channel_buffer_2);
378        drop(channel_buffer_3);
379    });
380    deterministic.run_until_parked();
381
382    // The channel buffer can be reopened after dropping it.
383    let channel_buffer = client_a
384        .channel_store()
385        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
386        .await
387        .unwrap();
388    assert_ne!(channel_buffer.entity_id(), channel_buffer_entity_id);
389    channel_buffer.update(cx_a, |buffer, cx| {
390        buffer.buffer().update(cx, |buffer, _| {
391            assert_eq!(buffer.text(), "hello");
392        })
393    });
394}
395
396#[gpui::test]
397async fn test_channel_buffer_disconnect(
398    deterministic: BackgroundExecutor,
399    cx_a: &mut TestAppContext,
400    cx_b: &mut TestAppContext,
401) {
402    let mut server = TestServer::start(deterministic.clone()).await;
403    let client_a = server.create_client(cx_a, "user_a").await;
404    let client_b = server.create_client(cx_b, "user_b").await;
405
406    let channel_id = server
407        .make_channel(
408            "the-channel",
409            None,
410            (&client_a, cx_a),
411            &mut [(&client_b, cx_b)],
412        )
413        .await;
414
415    let channel_buffer_a = client_a
416        .channel_store()
417        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
418        .await
419        .unwrap();
420
421    let channel_buffer_b = client_b
422        .channel_store()
423        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
424        .await
425        .unwrap();
426
427    server.forbid_connections();
428    server.disconnect_client(client_a.peer_id().unwrap());
429    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
430
431    channel_buffer_a.update(cx_a, |buffer, cx| {
432        assert_eq!(buffer.channel(cx).unwrap().name, "the-channel");
433        assert!(!buffer.is_connected());
434    });
435
436    deterministic.run_until_parked();
437
438    server.allow_connections();
439    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
440
441    deterministic.run_until_parked();
442
443    client_a
444        .channel_store()
445        .update(cx_a, |channel_store, _| {
446            channel_store.remove_channel(channel_id)
447        })
448        .await
449        .unwrap();
450    deterministic.run_until_parked();
451
452    // Channel buffer observed the deletion
453    channel_buffer_b.update(cx_b, |buffer, cx| {
454        assert!(buffer.channel(cx).is_none());
455        assert!(!buffer.is_connected());
456    });
457}
458
459#[gpui::test]
460async fn test_rejoin_channel_buffer(
461    deterministic: BackgroundExecutor,
462    cx_a: &mut TestAppContext,
463    cx_b: &mut TestAppContext,
464) {
465    let mut server = TestServer::start(deterministic.clone()).await;
466    let client_a = server.create_client(cx_a, "user_a").await;
467    let client_b = server.create_client(cx_b, "user_b").await;
468
469    let channel_id = server
470        .make_channel(
471            "the-channel",
472            None,
473            (&client_a, cx_a),
474            &mut [(&client_b, cx_b)],
475        )
476        .await;
477
478    let channel_buffer_a = client_a
479        .channel_store()
480        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
481        .await
482        .unwrap();
483    let channel_buffer_b = client_b
484        .channel_store()
485        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
486        .await
487        .unwrap();
488
489    channel_buffer_a.update(cx_a, |buffer, cx| {
490        buffer.buffer().update(cx, |buffer, cx| {
491            buffer.edit([(0..0, "1")], None, cx);
492        })
493    });
494    deterministic.run_until_parked();
495
496    // Client A disconnects.
497    server.forbid_connections();
498    server.disconnect_client(client_a.peer_id().unwrap());
499
500    // Both clients make an edit.
501    channel_buffer_a.update(cx_a, |buffer, cx| {
502        buffer.buffer().update(cx, |buffer, cx| {
503            buffer.edit([(1..1, "2")], None, cx);
504        })
505    });
506    channel_buffer_b.update(cx_b, |buffer, cx| {
507        buffer.buffer().update(cx, |buffer, cx| {
508            buffer.edit([(0..0, "0")], None, cx);
509        })
510    });
511
512    // Both clients see their own edit.
513    deterministic.run_until_parked();
514    channel_buffer_a.read_with(cx_a, |buffer, cx| {
515        assert_eq!(buffer.buffer().read(cx).text(), "12");
516    });
517    channel_buffer_b.read_with(cx_b, |buffer, cx| {
518        assert_eq!(buffer.buffer().read(cx).text(), "01");
519    });
520
521    // Client A reconnects. Both clients see each other's edits, and see
522    // the same collaborators.
523    server.allow_connections();
524    deterministic.advance_clock(RECEIVE_TIMEOUT);
525    channel_buffer_a.read_with(cx_a, |buffer, cx| {
526        assert_eq!(buffer.buffer().read(cx).text(), "012");
527    });
528    channel_buffer_b.read_with(cx_b, |buffer, cx| {
529        assert_eq!(buffer.buffer().read(cx).text(), "012");
530    });
531
532    channel_buffer_a.read_with(cx_a, |buffer_a, _| {
533        channel_buffer_b.read_with(cx_b, |buffer_b, _| {
534            assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
535        });
536    });
537}
538
539#[gpui::test]
540async fn test_channel_buffers_and_server_restarts(
541    deterministic: BackgroundExecutor,
542    cx_a: &mut TestAppContext,
543    cx_b: &mut TestAppContext,
544    cx_c: &mut TestAppContext,
545) {
546    let mut server = TestServer::start(deterministic.clone()).await;
547    let client_a = server.create_client(cx_a, "user_a").await;
548    let client_b = server.create_client(cx_b, "user_b").await;
549    let client_c = server.create_client(cx_c, "user_c").await;
550
551    let channel_id = server
552        .make_channel(
553            "the-channel",
554            None,
555            (&client_a, cx_a),
556            &mut [(&client_b, cx_b), (&client_c, cx_c)],
557        )
558        .await;
559
560    let channel_buffer_a = client_a
561        .channel_store()
562        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
563        .await
564        .unwrap();
565    let channel_buffer_b = client_b
566        .channel_store()
567        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
568        .await
569        .unwrap();
570    let _channel_buffer_c = client_c
571        .channel_store()
572        .update(cx_c, |store, cx| store.open_channel_buffer(channel_id, cx))
573        .await
574        .unwrap();
575
576    channel_buffer_a.update(cx_a, |buffer, cx| {
577        buffer.buffer().update(cx, |buffer, cx| {
578            buffer.edit([(0..0, "1")], None, cx);
579        })
580    });
581    deterministic.run_until_parked();
582
583    // Client C can't reconnect.
584    client_c.override_establish_connection(|_, cx| cx.spawn(async |_| future::pending().await));
585
586    // Server stops.
587    server.reset().await;
588    deterministic.advance_clock(RECEIVE_TIMEOUT);
589
590    // While the server is down, both clients make an edit.
591    channel_buffer_a.update(cx_a, |buffer, cx| {
592        buffer.buffer().update(cx, |buffer, cx| {
593            buffer.edit([(1..1, "2")], None, cx);
594        })
595    });
596    channel_buffer_b.update(cx_b, |buffer, cx| {
597        buffer.buffer().update(cx, |buffer, cx| {
598            buffer.edit([(0..0, "0")], None, cx);
599        })
600    });
601
602    // Server restarts.
603    server.start().await.unwrap();
604    deterministic.advance_clock(CLEANUP_TIMEOUT);
605
606    // Clients reconnects. Clients A and B see each other's edits, and see
607    // that client C has disconnected.
608    channel_buffer_a.read_with(cx_a, |buffer, cx| {
609        assert_eq!(buffer.buffer().read(cx).text(), "012");
610    });
611    channel_buffer_b.read_with(cx_b, |buffer, cx| {
612        assert_eq!(buffer.buffer().read(cx).text(), "012");
613    });
614
615    channel_buffer_a.read_with(cx_a, |buffer_a, _| {
616        channel_buffer_b.read_with(cx_b, |buffer_b, _| {
617            assert_collaborators(
618                buffer_a.collaborators(),
619                &[client_a.user_id(), client_b.user_id()],
620            );
621            assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
622        });
623    });
624}
625
626#[gpui::test]
627async fn test_channel_buffer_changes(
628    deterministic: BackgroundExecutor,
629    cx_a: &mut TestAppContext,
630    cx_b: &mut TestAppContext,
631) {
632    let (server, client_a, client_b, channel_id) = TestServer::start2(cx_a, cx_b).await;
633    let (_, cx_a) = client_a.build_test_workspace(cx_a).await;
634    let (workspace_b, cx_b) = client_b.build_test_workspace(cx_b).await;
635    let channel_store_b = client_b.channel_store().clone();
636
637    // Editing the channel notes should set them to dirty
638    open_channel_notes(channel_id, cx_a).await.unwrap();
639    cx_a.simulate_keystrokes("1");
640    channel_store_b.read_with(cx_b, |channel_store, _| {
641        assert!(channel_store.has_channel_buffer_changed(channel_id))
642    });
643
644    // Opening the buffer should clear the changed flag.
645    open_channel_notes(channel_id, cx_b).await.unwrap();
646    channel_store_b.read_with(cx_b, |channel_store, _| {
647        assert!(!channel_store.has_channel_buffer_changed(channel_id))
648    });
649
650    // Editing the channel while the buffer is open should not show that the buffer has changed.
651    cx_a.simulate_keystrokes("2");
652    channel_store_b.read_with(cx_b, |channel_store, _| {
653        assert!(!channel_store.has_channel_buffer_changed(channel_id))
654    });
655
656    // Test that the server is tracking things correctly, and we retain our 'not changed'
657    // state across a disconnect
658    deterministic.advance_clock(ACKNOWLEDGE_DEBOUNCE_INTERVAL);
659    server
660        .simulate_long_connection_interruption(client_b.peer_id().unwrap(), deterministic.clone());
661    channel_store_b.read_with(cx_b, |channel_store, _| {
662        assert!(!channel_store.has_channel_buffer_changed(channel_id))
663    });
664
665    // Closing the buffer should re-enable change tracking
666    cx_b.update(|window, cx| {
667        workspace_b.update(cx, |workspace, cx| {
668            workspace.close_all_items_and_panes(&Default::default(), window, cx)
669        });
670    });
671    deterministic.run_until_parked();
672
673    cx_a.simulate_keystrokes("3");
674    channel_store_b.read_with(cx_b, |channel_store, _| {
675        assert!(channel_store.has_channel_buffer_changed(channel_id))
676    });
677}
678
679#[gpui::test]
680async fn test_channel_buffer_changes_persist(
681    cx_a: &mut TestAppContext,
682    cx_b: &mut TestAppContext,
683    cx_b2: &mut TestAppContext,
684) {
685    let (mut server, client_a, client_b, channel_id) = TestServer::start2(cx_a, cx_b).await;
686    let (_, cx_a) = client_a.build_test_workspace(cx_a).await;
687    let (_, cx_b) = client_b.build_test_workspace(cx_b).await;
688
689    // a) edits the notes
690    open_channel_notes(channel_id, cx_a).await.unwrap();
691    cx_a.simulate_keystrokes("1");
692    // b) opens them to observe the current version
693    open_channel_notes(channel_id, cx_b).await.unwrap();
694
695    // On boot the client should get the correct state.
696    let client_b2 = server.create_client(cx_b2, "user_b").await;
697    let channel_store_b2 = client_b2.channel_store().clone();
698    channel_store_b2.read_with(cx_b2, |channel_store, _| {
699        assert!(!channel_store.has_channel_buffer_changed(channel_id))
700    });
701}
702
703#[track_caller]
704fn assert_collaborators(collaborators: &HashMap<PeerId, Collaborator>, ids: &[Option<UserId>]) {
705    let mut user_ids = collaborators
706        .values()
707        .map(|collaborator| collaborator.user_id)
708        .collect::<Vec<_>>();
709    user_ids.sort();
710    assert_eq!(
711        user_ids,
712        ids.iter().map(|id| id.unwrap()).collect::<Vec<_>>()
713    );
714}
715
716fn buffer_text(channel_buffer: &Entity<language::Buffer>, cx: &mut TestAppContext) -> String {
717    channel_buffer.read_with(cx, |buffer, _| buffer.text())
718}