channel_buffer_tests.rs

  1use crate::{TestServer, test_server::open_channel_notes};
  2use call::ActiveCall;
  3use channel::ACKNOWLEDGE_DEBOUNCE_INTERVAL;
  4use client::{Collaborator, ParticipantIndex, UserId};
  5use collab::rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT};
  6use collab_ui::channel_view::ChannelView;
  7use collections::HashMap;
  8use editor::{Anchor, Editor, MultiBufferOffset, ToOffset};
  9use futures::future;
 10use gpui::{BackgroundExecutor, Context, Entity, TestAppContext, Window};
 11use rpc::{RECEIVE_TIMEOUT, proto::PeerId};
 12use serde_json::json;
 13use std::ops::Range;
 14use util::rel_path::rel_path;
 15use workspace::CollaboratorId;
 16
 17#[gpui::test]
 18async fn test_core_channel_buffers(
 19    executor: BackgroundExecutor,
 20    cx_a: &mut TestAppContext,
 21    cx_b: &mut TestAppContext,
 22) {
 23    let mut server = TestServer::start(executor.clone()).await;
 24    let client_a = server.create_client(cx_a, "user_a").await;
 25    let client_b = server.create_client(cx_b, "user_b").await;
 26
 27    let channel_id = server
 28        .make_channel("zed", None, (&client_a, cx_a), &mut [(&client_b, cx_b)])
 29        .await;
 30
 31    // Client A joins the channel buffer
 32    let channel_buffer_a = client_a
 33        .channel_store()
 34        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
 35        .await
 36        .unwrap();
 37
 38    // Client A edits the buffer
 39    let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
 40    buffer_a.update(cx_a, |buffer, cx| {
 41        buffer.edit([(0..0, "hello world")], None, cx)
 42    });
 43    buffer_a.update(cx_a, |buffer, cx| {
 44        buffer.edit([(5..5, ", cruel")], None, cx)
 45    });
 46    buffer_a.update(cx_a, |buffer, cx| {
 47        buffer.edit([(0..5, "goodbye")], None, cx)
 48    });
 49    buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
 50    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
 51    executor.run_until_parked();
 52
 53    // Client B joins the channel buffer
 54    let channel_buffer_b = client_b
 55        .channel_store()
 56        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
 57        .await
 58        .unwrap();
 59    channel_buffer_b.read_with(cx_b, |buffer, _| {
 60        assert_collaborators(
 61            buffer.collaborators(),
 62            &[client_a.user_id(), client_b.user_id()],
 63        );
 64    });
 65
 66    // Client B sees the correct text, and then edits it
 67    let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
 68    assert_eq!(
 69        buffer_b.read_with(cx_b, |buffer, _| buffer.remote_id()),
 70        buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id())
 71    );
 72    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
 73    buffer_b.update(cx_b, |buffer, cx| {
 74        buffer.edit([(7..12, "beautiful")], None, cx)
 75    });
 76
 77    // Both A and B see the new edit
 78    executor.run_until_parked();
 79    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
 80    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
 81
 82    // Client A closes the channel buffer.
 83    cx_a.update(|_| drop(channel_buffer_a));
 84    executor.run_until_parked();
 85
 86    // Client B sees that client A is gone from the channel buffer.
 87    channel_buffer_b.read_with(cx_b, |buffer, _| {
 88        assert_collaborators(buffer.collaborators(), &[client_b.user_id()]);
 89    });
 90
 91    // Client A rejoins the channel buffer
 92    let _channel_buffer_a = client_a
 93        .channel_store()
 94        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
 95        .await
 96        .unwrap();
 97    executor.run_until_parked();
 98
 99    // Sanity test, make sure we saw A rejoining
100    channel_buffer_b.read_with(cx_b, |buffer, _| {
101        assert_collaborators(
102            buffer.collaborators(),
103            &[client_a.user_id(), client_b.user_id()],
104        );
105    });
106
107    // Client A loses connection.
108    server.forbid_connections();
109    server.disconnect_client(client_a.peer_id().unwrap());
110    executor.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
111
112    // Client B observes A disconnect
113    channel_buffer_b.read_with(cx_b, |buffer, _| {
114        assert_collaborators(buffer.collaborators(), &[client_b.user_id()]);
115    });
116
117    // TODO:
118    // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
119    // - Test interaction with channel deletion while buffer is open
120}
121
122#[gpui::test]
123async fn test_channel_notes_participant_indices(
124    executor: BackgroundExecutor,
125    cx_a: &mut TestAppContext,
126    cx_b: &mut TestAppContext,
127    cx_c: &mut TestAppContext,
128) {
129    let mut server = TestServer::start(executor.clone()).await;
130    let client_a = server.create_client(cx_a, "user_a").await;
131    let client_b = server.create_client(cx_b, "user_b").await;
132    let client_c = server.create_client(cx_c, "user_c").await;
133
134    let active_call_a = cx_a.read(ActiveCall::global);
135    let active_call_b = cx_b.read(ActiveCall::global);
136
137    cx_a.update(editor::init);
138    cx_b.update(editor::init);
139    cx_c.update(editor::init);
140
141    let channel_id = server
142        .make_channel(
143            "the-channel",
144            None,
145            (&client_a, cx_a),
146            &mut [(&client_b, cx_b), (&client_c, cx_c)],
147        )
148        .await;
149
150    client_a
151        .fs()
152        .insert_tree("/root", json!({"file.txt": "123"}))
153        .await;
154    let (project_a, worktree_id_a) = client_a.build_local_project_with_trust("/root", cx_a).await;
155    let project_b = client_b.build_empty_local_project(false, cx_b);
156    let project_c = client_c.build_empty_local_project(false, cx_c);
157
158    let (workspace_a, mut cx_a) = client_a.build_workspace(&project_a, cx_a);
159    let (workspace_b, mut cx_b) = client_b.build_workspace(&project_b, cx_b);
160    let (workspace_c, cx_c) = client_c.build_workspace(&project_c, cx_c);
161
162    // Clients A, B, and C open the channel notes
163    let channel_view_a = cx_a
164        .update(|window, cx| ChannelView::open(channel_id, None, workspace_a.clone(), window, cx))
165        .await
166        .unwrap();
167    let channel_view_b = cx_b
168        .update(|window, cx| ChannelView::open(channel_id, None, workspace_b.clone(), window, cx))
169        .await
170        .unwrap();
171    let channel_view_c = cx_c
172        .update(|window, cx| ChannelView::open(channel_id, None, workspace_c.clone(), window, cx))
173        .await
174        .unwrap();
175
176    // Clients A, B, and C all insert and select some text
177    channel_view_a.update_in(cx_a, |notes, window, cx| {
178        notes.editor.update(cx, |editor, cx| {
179            editor.insert("a", window, cx);
180            editor.change_selections(Default::default(), window, cx, |selections| {
181                selections.select_ranges(vec![MultiBufferOffset(0)..MultiBufferOffset(1)]);
182            });
183        });
184    });
185    executor.run_until_parked();
186    channel_view_b.update_in(cx_b, |notes, window, cx| {
187        notes.editor.update(cx, |editor, cx| {
188            editor.move_down(&Default::default(), window, cx);
189            editor.insert("b", window, cx);
190            editor.change_selections(Default::default(), window, cx, |selections| {
191                selections.select_ranges(vec![MultiBufferOffset(1)..MultiBufferOffset(2)]);
192            });
193        });
194    });
195    executor.run_until_parked();
196    channel_view_c.update_in(cx_c, |notes, window, cx| {
197        notes.editor.update(cx, |editor, cx| {
198            editor.move_down(&Default::default(), window, cx);
199            editor.insert("c", window, cx);
200            editor.change_selections(Default::default(), window, cx, |selections| {
201                selections.select_ranges(vec![MultiBufferOffset(2)..MultiBufferOffset(3)]);
202            });
203        });
204    });
205
206    // Client A sees clients B and C without assigned colors, because they aren't
207    // in a call together.
208    executor.run_until_parked();
209    channel_view_a.update_in(cx_a, |notes, window, cx| {
210        notes.editor.update(cx, |editor, cx| {
211            assert_remote_selections(editor, &[(None, 1..2), (None, 2..3)], window, cx);
212        });
213    });
214
215    // Clients A and B join the same call.
216    for (call, cx) in [(&active_call_a, &mut cx_a), (&active_call_b, &mut cx_b)] {
217        call.update(*cx, |call, cx| call.join_channel(channel_id, cx))
218            .await
219            .unwrap();
220    }
221
222    // Clients A and B see each other with two different assigned colors. Client C
223    // still doesn't have a color.
224    executor.run_until_parked();
225    channel_view_a.update_in(cx_a, |notes, window, cx| {
226        notes.editor.update(cx, |editor, cx| {
227            assert_remote_selections(
228                editor,
229                &[(Some(ParticipantIndex(1)), 1..2), (None, 2..3)],
230                window,
231                cx,
232            );
233        });
234    });
235    channel_view_b.update_in(cx_b, |notes, window, cx| {
236        notes.editor.update(cx, |editor, cx| {
237            assert_remote_selections(
238                editor,
239                &[(Some(ParticipantIndex(0)), 0..1), (None, 2..3)],
240                window,
241                cx,
242            );
243        });
244    });
245
246    // Client A shares a project, and client B joins.
247    let project_id = active_call_a
248        .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
249        .await
250        .unwrap();
251    let project_b = client_b.join_remote_project(project_id, cx_b).await;
252    let (workspace_b, cx_b) = client_b.build_workspace(&project_b, cx_b);
253
254    // Clients A and B open the same file.
255    let editor_a = workspace_a
256        .update_in(cx_a, |workspace, window, cx| {
257            workspace.open_path(
258                (worktree_id_a, rel_path("file.txt")),
259                None,
260                true,
261                window,
262                cx,
263            )
264        })
265        .await
266        .unwrap()
267        .downcast::<Editor>()
268        .unwrap();
269    let editor_b = workspace_b
270        .update_in(cx_b, |workspace, window, cx| {
271            workspace.open_path(
272                (worktree_id_a, rel_path("file.txt")),
273                None,
274                true,
275                window,
276                cx,
277            )
278        })
279        .await
280        .unwrap()
281        .downcast::<Editor>()
282        .unwrap();
283
284    editor_a.update_in(cx_a, |editor, window, cx| {
285        editor.change_selections(Default::default(), window, cx, |selections| {
286            selections.select_ranges(vec![MultiBufferOffset(0)..MultiBufferOffset(1)]);
287        });
288    });
289    editor_b.update_in(cx_b, |editor, window, cx| {
290        editor.change_selections(Default::default(), window, cx, |selections| {
291            selections.select_ranges(vec![MultiBufferOffset(2)..MultiBufferOffset(3)]);
292        });
293    });
294    executor.run_until_parked();
295
296    // Clients A and B see each other with the same colors as in the channel notes.
297    editor_a.update_in(cx_a, |editor, window, cx| {
298        assert_remote_selections(editor, &[(Some(ParticipantIndex(1)), 2..3)], window, cx);
299    });
300    editor_b.update_in(cx_b, |editor, window, cx| {
301        assert_remote_selections(editor, &[(Some(ParticipantIndex(0)), 0..1)], window, cx);
302    });
303}
304
305#[track_caller]
306fn assert_remote_selections(
307    editor: &mut Editor,
308    expected_selections: &[(Option<ParticipantIndex>, Range<usize>)],
309    window: &mut Window,
310    cx: &mut Context<Editor>,
311) {
312    let snapshot = editor.snapshot(window, cx);
313    let hub = editor.collaboration_hub().unwrap();
314    let collaborators = hub.collaborators(cx);
315    let range = Anchor::min()..Anchor::max();
316    let remote_selections = snapshot
317        .remote_selections_in_range(&range, hub, cx)
318        .map(|s| {
319            let CollaboratorId::PeerId(peer_id) = s.collaborator_id else {
320                panic!("unexpected collaborator id");
321            };
322            let start = s.selection.start.to_offset(snapshot.buffer_snapshot());
323            let end = s.selection.end.to_offset(snapshot.buffer_snapshot());
324            let user_id = collaborators.get(&peer_id).unwrap().user_id;
325            let participant_index = hub.user_participant_indices(cx).get(&user_id).copied();
326            (participant_index, start.0..end.0)
327        })
328        .collect::<Vec<_>>();
329    assert_eq!(
330        remote_selections, expected_selections,
331        "incorrect remote selections"
332    );
333}
334
335#[gpui::test]
336async fn test_multiple_handles_to_channel_buffer(
337    deterministic: BackgroundExecutor,
338    cx_a: &mut TestAppContext,
339) {
340    let mut server = TestServer::start(deterministic.clone()).await;
341    let client_a = server.create_client(cx_a, "user_a").await;
342
343    let channel_id = server
344        .make_channel("the-channel", None, (&client_a, cx_a), &mut [])
345        .await;
346
347    let channel_buffer_1 = client_a
348        .channel_store()
349        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
350    let channel_buffer_2 = client_a
351        .channel_store()
352        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
353    let channel_buffer_3 = client_a
354        .channel_store()
355        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
356
357    // All concurrent tasks for opening a channel buffer return the same model handle.
358    let (channel_buffer, channel_buffer_2, channel_buffer_3) =
359        future::try_join3(channel_buffer_1, channel_buffer_2, channel_buffer_3)
360            .await
361            .unwrap();
362    let channel_buffer_entity_id = channel_buffer.entity_id();
363    assert_eq!(channel_buffer, channel_buffer_2);
364    assert_eq!(channel_buffer, channel_buffer_3);
365
366    channel_buffer.update(cx_a, |buffer, cx| {
367        buffer.buffer().update(cx, |buffer, cx| {
368            buffer.edit([(0..0, "hello")], None, cx);
369        })
370    });
371    deterministic.run_until_parked();
372
373    cx_a.update(|_| {
374        drop(channel_buffer);
375        drop(channel_buffer_2);
376        drop(channel_buffer_3);
377    });
378    deterministic.run_until_parked();
379
380    // The channel buffer can be reopened after dropping it.
381    let channel_buffer = client_a
382        .channel_store()
383        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
384        .await
385        .unwrap();
386    assert_ne!(channel_buffer.entity_id(), channel_buffer_entity_id);
387    channel_buffer.update(cx_a, |buffer, cx| {
388        buffer.buffer().update(cx, |buffer, _| {
389            assert_eq!(buffer.text(), "hello");
390        })
391    });
392}
393
394#[gpui::test]
395async fn test_channel_buffer_disconnect(
396    deterministic: BackgroundExecutor,
397    cx_a: &mut TestAppContext,
398    cx_b: &mut TestAppContext,
399) {
400    let mut server = TestServer::start(deterministic.clone()).await;
401    let client_a = server.create_client(cx_a, "user_a").await;
402    let client_b = server.create_client(cx_b, "user_b").await;
403
404    let channel_id = server
405        .make_channel(
406            "the-channel",
407            None,
408            (&client_a, cx_a),
409            &mut [(&client_b, cx_b)],
410        )
411        .await;
412
413    let channel_buffer_a = client_a
414        .channel_store()
415        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
416        .await
417        .unwrap();
418
419    let channel_buffer_b = client_b
420        .channel_store()
421        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
422        .await
423        .unwrap();
424
425    server.forbid_connections();
426    server.disconnect_client(client_a.peer_id().unwrap());
427    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
428
429    channel_buffer_a.update(cx_a, |buffer, cx| {
430        assert_eq!(buffer.channel(cx).unwrap().name, "the-channel");
431        assert!(!buffer.is_connected());
432    });
433
434    deterministic.run_until_parked();
435
436    server.allow_connections();
437    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
438
439    deterministic.run_until_parked();
440
441    client_a
442        .channel_store()
443        .update(cx_a, |channel_store, _| {
444            channel_store.remove_channel(channel_id)
445        })
446        .await
447        .unwrap();
448    deterministic.run_until_parked();
449
450    // Channel buffer observed the deletion
451    channel_buffer_b.update(cx_b, |buffer, cx| {
452        assert!(buffer.channel(cx).is_none());
453        assert!(!buffer.is_connected());
454    });
455}
456
457#[gpui::test]
458async fn test_rejoin_channel_buffer(
459    deterministic: BackgroundExecutor,
460    cx_a: &mut TestAppContext,
461    cx_b: &mut TestAppContext,
462) {
463    let mut server = TestServer::start(deterministic.clone()).await;
464    let client_a = server.create_client(cx_a, "user_a").await;
465    let client_b = server.create_client(cx_b, "user_b").await;
466
467    let channel_id = server
468        .make_channel(
469            "the-channel",
470            None,
471            (&client_a, cx_a),
472            &mut [(&client_b, cx_b)],
473        )
474        .await;
475
476    let channel_buffer_a = client_a
477        .channel_store()
478        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
479        .await
480        .unwrap();
481    let channel_buffer_b = client_b
482        .channel_store()
483        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
484        .await
485        .unwrap();
486
487    channel_buffer_a.update(cx_a, |buffer, cx| {
488        buffer.buffer().update(cx, |buffer, cx| {
489            buffer.edit([(0..0, "1")], None, cx);
490        })
491    });
492    deterministic.run_until_parked();
493
494    // Client A disconnects.
495    server.forbid_connections();
496    server.disconnect_client(client_a.peer_id().unwrap());
497
498    // Both clients make an edit.
499    channel_buffer_a.update(cx_a, |buffer, cx| {
500        buffer.buffer().update(cx, |buffer, cx| {
501            buffer.edit([(1..1, "2")], None, cx);
502        })
503    });
504    channel_buffer_b.update(cx_b, |buffer, cx| {
505        buffer.buffer().update(cx, |buffer, cx| {
506            buffer.edit([(0..0, "0")], None, cx);
507        })
508    });
509
510    // Both clients see their own edit.
511    deterministic.run_until_parked();
512    channel_buffer_a.read_with(cx_a, |buffer, cx| {
513        assert_eq!(buffer.buffer().read(cx).text(), "12");
514    });
515    channel_buffer_b.read_with(cx_b, |buffer, cx| {
516        assert_eq!(buffer.buffer().read(cx).text(), "01");
517    });
518
519    // Client A reconnects. Both clients see each other's edits, and see
520    // the same collaborators.
521    server.allow_connections();
522    deterministic.advance_clock(RECEIVE_TIMEOUT);
523    channel_buffer_a.read_with(cx_a, |buffer, cx| {
524        assert_eq!(buffer.buffer().read(cx).text(), "012");
525    });
526    channel_buffer_b.read_with(cx_b, |buffer, cx| {
527        assert_eq!(buffer.buffer().read(cx).text(), "012");
528    });
529
530    channel_buffer_a.read_with(cx_a, |buffer_a, _| {
531        channel_buffer_b.read_with(cx_b, |buffer_b, _| {
532            assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
533        });
534    });
535}
536
537#[gpui::test]
538async fn test_channel_buffers_and_server_restarts(
539    deterministic: BackgroundExecutor,
540    cx_a: &mut TestAppContext,
541    cx_b: &mut TestAppContext,
542    cx_c: &mut TestAppContext,
543) {
544    let mut server = TestServer::start(deterministic.clone()).await;
545    let client_a = server.create_client(cx_a, "user_a").await;
546    let client_b = server.create_client(cx_b, "user_b").await;
547    let client_c = server.create_client(cx_c, "user_c").await;
548
549    let channel_id = server
550        .make_channel(
551            "the-channel",
552            None,
553            (&client_a, cx_a),
554            &mut [(&client_b, cx_b), (&client_c, cx_c)],
555        )
556        .await;
557
558    let channel_buffer_a = client_a
559        .channel_store()
560        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
561        .await
562        .unwrap();
563    let channel_buffer_b = client_b
564        .channel_store()
565        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
566        .await
567        .unwrap();
568    let _channel_buffer_c = client_c
569        .channel_store()
570        .update(cx_c, |store, cx| store.open_channel_buffer(channel_id, cx))
571        .await
572        .unwrap();
573
574    channel_buffer_a.update(cx_a, |buffer, cx| {
575        buffer.buffer().update(cx, |buffer, cx| {
576            buffer.edit([(0..0, "1")], None, cx);
577        })
578    });
579    deterministic.run_until_parked();
580
581    // Client C can't reconnect.
582    client_c.override_establish_connection(|_, cx| cx.spawn(async |_| future::pending().await));
583
584    // Server stops.
585    server.reset().await;
586    deterministic.advance_clock(RECEIVE_TIMEOUT);
587
588    // While the server is down, both clients make an edit.
589    channel_buffer_a.update(cx_a, |buffer, cx| {
590        buffer.buffer().update(cx, |buffer, cx| {
591            buffer.edit([(1..1, "2")], None, cx);
592        })
593    });
594    channel_buffer_b.update(cx_b, |buffer, cx| {
595        buffer.buffer().update(cx, |buffer, cx| {
596            buffer.edit([(0..0, "0")], None, cx);
597        })
598    });
599
600    // Server restarts.
601    server.start().await.unwrap();
602    deterministic.advance_clock(CLEANUP_TIMEOUT);
603
604    // Clients reconnects. Clients A and B see each other's edits, and see
605    // that client C has disconnected.
606    channel_buffer_a.read_with(cx_a, |buffer, cx| {
607        assert_eq!(buffer.buffer().read(cx).text(), "012");
608    });
609    channel_buffer_b.read_with(cx_b, |buffer, cx| {
610        assert_eq!(buffer.buffer().read(cx).text(), "012");
611    });
612
613    channel_buffer_a.read_with(cx_a, |buffer_a, _| {
614        channel_buffer_b.read_with(cx_b, |buffer_b, _| {
615            assert_collaborators(
616                buffer_a.collaborators(),
617                &[client_a.user_id(), client_b.user_id()],
618            );
619            assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
620        });
621    });
622}
623
624#[gpui::test]
625async fn test_channel_buffer_changes(
626    deterministic: BackgroundExecutor,
627    cx_a: &mut TestAppContext,
628    cx_b: &mut TestAppContext,
629) {
630    let (server, client_a, client_b, channel_id) = TestServer::start2(cx_a, cx_b).await;
631    let (_, cx_a) = client_a.build_test_workspace(cx_a).await;
632    let (workspace_b, cx_b) = client_b.build_test_workspace(cx_b).await;
633    let channel_store_b = client_b.channel_store().clone();
634
635    // Editing the channel notes should set them to dirty
636    open_channel_notes(channel_id, cx_a).await.unwrap();
637    cx_a.simulate_keystrokes("1");
638    channel_store_b.read_with(cx_b, |channel_store, _| {
639        assert!(channel_store.has_channel_buffer_changed(channel_id))
640    });
641
642    // Opening the buffer should clear the changed flag.
643    open_channel_notes(channel_id, cx_b).await.unwrap();
644    channel_store_b.read_with(cx_b, |channel_store, _| {
645        assert!(!channel_store.has_channel_buffer_changed(channel_id))
646    });
647
648    // Editing the channel while the buffer is open should not show that the buffer has changed.
649    cx_a.simulate_keystrokes("2");
650    channel_store_b.read_with(cx_b, |channel_store, _| {
651        assert!(!channel_store.has_channel_buffer_changed(channel_id))
652    });
653
654    // Test that the server is tracking things correctly, and we retain our 'not changed'
655    // state across a disconnect
656    deterministic.advance_clock(ACKNOWLEDGE_DEBOUNCE_INTERVAL);
657    server
658        .simulate_long_connection_interruption(client_b.peer_id().unwrap(), deterministic.clone());
659    channel_store_b.read_with(cx_b, |channel_store, _| {
660        assert!(!channel_store.has_channel_buffer_changed(channel_id))
661    });
662
663    // Closing the buffer should re-enable change tracking
664    cx_b.update(|window, cx| {
665        workspace_b.update(cx, |workspace, cx| {
666            workspace.close_all_items_and_panes(&Default::default(), window, cx)
667        });
668    });
669    deterministic.run_until_parked();
670
671    cx_a.simulate_keystrokes("3");
672    channel_store_b.read_with(cx_b, |channel_store, _| {
673        assert!(channel_store.has_channel_buffer_changed(channel_id))
674    });
675}
676
677#[gpui::test]
678async fn test_channel_buffer_changes_persist(
679    cx_a: &mut TestAppContext,
680    cx_b: &mut TestAppContext,
681    cx_b2: &mut TestAppContext,
682) {
683    let (mut server, client_a, client_b, channel_id) = TestServer::start2(cx_a, cx_b).await;
684    let (_, cx_a) = client_a.build_test_workspace(cx_a).await;
685    let (_, cx_b) = client_b.build_test_workspace(cx_b).await;
686
687    // a) edits the notes
688    open_channel_notes(channel_id, cx_a).await.unwrap();
689    cx_a.simulate_keystrokes("1");
690    // b) opens them to observe the current version
691    open_channel_notes(channel_id, cx_b).await.unwrap();
692
693    // On boot the client should get the correct state.
694    let client_b2 = server.create_client(cx_b2, "user_b").await;
695    let channel_store_b2 = client_b2.channel_store().clone();
696    channel_store_b2.read_with(cx_b2, |channel_store, _| {
697        assert!(!channel_store.has_channel_buffer_changed(channel_id))
698    });
699}
700
701#[track_caller]
702fn assert_collaborators(collaborators: &HashMap<PeerId, Collaborator>, ids: &[Option<UserId>]) {
703    let mut user_ids = collaborators
704        .values()
705        .map(|collaborator| collaborator.user_id)
706        .collect::<Vec<_>>();
707    user_ids.sort();
708    assert_eq!(
709        user_ids,
710        ids.iter().map(|id| id.unwrap()).collect::<Vec<_>>()
711    );
712}
713
714fn buffer_text(channel_buffer: &Entity<language::Buffer>, cx: &mut TestAppContext) -> String {
715    channel_buffer.read_with(cx, |buffer, _| buffer.text())
716}