channel_buffer_tests.rs

  1use crate::{
  2    rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT},
  3    tests::{test_server::open_channel_notes, TestServer},
  4};
  5use call::ActiveCall;
  6use channel::ACKNOWLEDGE_DEBOUNCE_INTERVAL;
  7use client::{Collaborator, ParticipantIndex, UserId};
  8use collab_ui::channel_view::ChannelView;
  9use collections::HashMap;
 10use editor::{Anchor, Editor, ToOffset};
 11use futures::future;
 12use gpui::{BackgroundExecutor, Model, TestAppContext, ViewContext};
 13use rpc::{proto::PeerId, RECEIVE_TIMEOUT};
 14use serde_json::json;
 15use std::ops::Range;
 16
 17#[gpui::test]
 18async fn test_core_channel_buffers(
 19    executor: BackgroundExecutor,
 20    cx_a: &mut TestAppContext,
 21    cx_b: &mut TestAppContext,
 22) {
 23    let mut server = TestServer::start(executor.clone()).await;
 24    let client_a = server.create_client(cx_a, "user_a").await;
 25    let client_b = server.create_client(cx_b, "user_b").await;
 26
 27    let channel_id = server
 28        .make_channel("zed", None, (&client_a, cx_a), &mut [(&client_b, cx_b)])
 29        .await;
 30
 31    // Client A joins the channel buffer
 32    let channel_buffer_a = client_a
 33        .channel_store()
 34        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
 35        .await
 36        .unwrap();
 37
 38    // Client A edits the buffer
 39    let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
 40    buffer_a.update(cx_a, |buffer, cx| {
 41        buffer.edit([(0..0, "hello world")], None, cx)
 42    });
 43    buffer_a.update(cx_a, |buffer, cx| {
 44        buffer.edit([(5..5, ", cruel")], None, cx)
 45    });
 46    buffer_a.update(cx_a, |buffer, cx| {
 47        buffer.edit([(0..5, "goodbye")], None, cx)
 48    });
 49    buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
 50    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
 51    executor.run_until_parked();
 52
 53    // Client B joins the channel buffer
 54    let channel_buffer_b = client_b
 55        .channel_store()
 56        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
 57        .await
 58        .unwrap();
 59    channel_buffer_b.read_with(cx_b, |buffer, _| {
 60        assert_collaborators(
 61            buffer.collaborators(),
 62            &[client_a.user_id(), client_b.user_id()],
 63        );
 64    });
 65
 66    // Client B sees the correct text, and then edits it
 67    let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
 68    assert_eq!(
 69        buffer_b.read_with(cx_b, |buffer, _| buffer.remote_id()),
 70        buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id())
 71    );
 72    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
 73    buffer_b.update(cx_b, |buffer, cx| {
 74        buffer.edit([(7..12, "beautiful")], None, cx)
 75    });
 76
 77    // Both A and B see the new edit
 78    executor.run_until_parked();
 79    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
 80    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
 81
 82    // Client A closes the channel buffer.
 83    cx_a.update(|_| drop(channel_buffer_a));
 84    executor.run_until_parked();
 85
 86    // Client B sees that client A is gone from the channel buffer.
 87    channel_buffer_b.read_with(cx_b, |buffer, _| {
 88        assert_collaborators(buffer.collaborators(), &[client_b.user_id()]);
 89    });
 90
 91    // Client A rejoins the channel buffer
 92    let _channel_buffer_a = client_a
 93        .channel_store()
 94        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
 95        .await
 96        .unwrap();
 97    executor.run_until_parked();
 98
 99    // Sanity test, make sure we saw A rejoining
100    channel_buffer_b.read_with(cx_b, |buffer, _| {
101        assert_collaborators(
102            buffer.collaborators(),
103            &[client_a.user_id(), client_b.user_id()],
104        );
105    });
106
107    // Client A loses connection.
108    server.forbid_connections();
109    server.disconnect_client(client_a.peer_id().unwrap());
110    executor.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
111
112    // Client B observes A disconnect
113    channel_buffer_b.read_with(cx_b, |buffer, _| {
114        assert_collaborators(buffer.collaborators(), &[client_b.user_id()]);
115    });
116
117    // TODO:
118    // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
119    // - Test interaction with channel deletion while buffer is open
120}
121
122#[gpui::test]
123async fn test_channel_notes_participant_indices(
124    executor: BackgroundExecutor,
125    cx_a: &mut TestAppContext,
126    cx_b: &mut TestAppContext,
127    cx_c: &mut TestAppContext,
128) {
129    let mut server = TestServer::start(executor.clone()).await;
130    let client_a = server.create_client(cx_a, "user_a").await;
131    let client_b = server.create_client(cx_b, "user_b").await;
132    let client_c = server.create_client(cx_c, "user_c").await;
133
134    let active_call_a = cx_a.read(ActiveCall::global);
135    let active_call_b = cx_b.read(ActiveCall::global);
136
137    cx_a.update(editor::init);
138    cx_b.update(editor::init);
139    cx_c.update(editor::init);
140
141    let channel_id = server
142        .make_channel(
143            "the-channel",
144            None,
145            (&client_a, cx_a),
146            &mut [(&client_b, cx_b), (&client_c, cx_c)],
147        )
148        .await;
149
150    client_a
151        .fs()
152        .insert_tree("/root", json!({"file.txt": "123"}))
153        .await;
154    let (project_a, worktree_id_a) = client_a.build_local_project("/root", cx_a).await;
155    let project_b = client_b.build_empty_local_project(cx_b);
156    let project_c = client_c.build_empty_local_project(cx_c);
157
158    let (workspace_a, mut cx_a) = client_a.build_workspace(&project_a, cx_a);
159    let (workspace_b, mut cx_b) = client_b.build_workspace(&project_b, cx_b);
160    let (workspace_c, cx_c) = client_c.build_workspace(&project_c, cx_c);
161
162    // Clients A, B, and C open the channel notes
163    let channel_view_a = cx_a
164        .update(|cx| ChannelView::open(channel_id, None, workspace_a.clone(), cx))
165        .await
166        .unwrap();
167    let channel_view_b = cx_b
168        .update(|cx| ChannelView::open(channel_id, None, workspace_b.clone(), cx))
169        .await
170        .unwrap();
171    let channel_view_c = cx_c
172        .update(|cx| ChannelView::open(channel_id, None, workspace_c.clone(), cx))
173        .await
174        .unwrap();
175
176    // Clients A, B, and C all insert and select some text
177    channel_view_a.update(cx_a, |notes, cx| {
178        notes.editor.update(cx, |editor, cx| {
179            editor.insert("a", cx);
180            editor.change_selections(None, cx, |selections| {
181                selections.select_ranges(vec![0..1]);
182            });
183        });
184    });
185    executor.run_until_parked();
186    channel_view_b.update(cx_b, |notes, cx| {
187        notes.editor.update(cx, |editor, cx| {
188            editor.move_down(&Default::default(), cx);
189            editor.insert("b", cx);
190            editor.change_selections(None, cx, |selections| {
191                selections.select_ranges(vec![1..2]);
192            });
193        });
194    });
195    executor.run_until_parked();
196    channel_view_c.update(cx_c, |notes, cx| {
197        notes.editor.update(cx, |editor, cx| {
198            editor.move_down(&Default::default(), cx);
199            editor.insert("c", cx);
200            editor.change_selections(None, cx, |selections| {
201                selections.select_ranges(vec![2..3]);
202            });
203        });
204    });
205
206    // Client A sees clients B and C without assigned colors, because they aren't
207    // in a call together.
208    executor.run_until_parked();
209    channel_view_a.update(cx_a, |notes, cx| {
210        notes.editor.update(cx, |editor, cx| {
211            assert_remote_selections(editor, &[(None, 1..2), (None, 2..3)], cx);
212        });
213    });
214
215    // Clients A and B join the same call.
216    for (call, cx) in [(&active_call_a, &mut cx_a), (&active_call_b, &mut cx_b)] {
217        call.update(*cx, |call, cx| call.join_channel(channel_id, cx))
218            .await
219            .unwrap();
220    }
221
222    // Clients A and B see each other with two different assigned colors. Client C
223    // still doesn't have a color.
224    executor.run_until_parked();
225    channel_view_a.update(cx_a, |notes, cx| {
226        notes.editor.update(cx, |editor, cx| {
227            assert_remote_selections(
228                editor,
229                &[(Some(ParticipantIndex(1)), 1..2), (None, 2..3)],
230                cx,
231            );
232        });
233    });
234    channel_view_b.update(cx_b, |notes, cx| {
235        notes.editor.update(cx, |editor, cx| {
236            assert_remote_selections(
237                editor,
238                &[(Some(ParticipantIndex(0)), 0..1), (None, 2..3)],
239                cx,
240            );
241        });
242    });
243
244    // Client A shares a project, and client B joins.
245    let project_id = active_call_a
246        .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
247        .await
248        .unwrap();
249    let project_b = client_b.join_remote_project(project_id, cx_b).await;
250    let (workspace_b, cx_b) = client_b.build_workspace(&project_b, cx_b);
251
252    // Clients A and B open the same file.
253    executor.start_waiting();
254    let editor_a = workspace_a
255        .update(cx_a, |workspace, cx| {
256            workspace.open_path((worktree_id_a, "file.txt"), None, true, cx)
257        })
258        .await
259        .unwrap()
260        .downcast::<Editor>()
261        .unwrap();
262    executor.start_waiting();
263    let editor_b = workspace_b
264        .update(cx_b, |workspace, cx| {
265            workspace.open_path((worktree_id_a, "file.txt"), None, true, cx)
266        })
267        .await
268        .unwrap()
269        .downcast::<Editor>()
270        .unwrap();
271
272    editor_a.update(cx_a, |editor, cx| {
273        editor.change_selections(None, cx, |selections| {
274            selections.select_ranges(vec![0..1]);
275        });
276    });
277    editor_b.update(cx_b, |editor, cx| {
278        editor.change_selections(None, cx, |selections| {
279            selections.select_ranges(vec![2..3]);
280        });
281    });
282    executor.run_until_parked();
283
284    // Clients A and B see each other with the same colors as in the channel notes.
285    editor_a.update(cx_a, |editor, cx| {
286        assert_remote_selections(editor, &[(Some(ParticipantIndex(1)), 2..3)], cx);
287    });
288    editor_b.update(cx_b, |editor, cx| {
289        assert_remote_selections(editor, &[(Some(ParticipantIndex(0)), 0..1)], cx);
290    });
291}
292
293#[track_caller]
294fn assert_remote_selections(
295    editor: &mut Editor,
296    expected_selections: &[(Option<ParticipantIndex>, Range<usize>)],
297    cx: &mut ViewContext<Editor>,
298) {
299    let snapshot = editor.snapshot(cx);
300    let range = Anchor::min()..Anchor::max();
301    let remote_selections = snapshot
302        .remote_selections_in_range(&range, editor.collaboration_hub().unwrap(), cx)
303        .map(|s| {
304            let start = s.selection.start.to_offset(&snapshot.buffer_snapshot);
305            let end = s.selection.end.to_offset(&snapshot.buffer_snapshot);
306            (s.participant_index, start..end)
307        })
308        .collect::<Vec<_>>();
309    assert_eq!(
310        remote_selections, expected_selections,
311        "incorrect remote selections"
312    );
313}
314
315#[gpui::test]
316async fn test_multiple_handles_to_channel_buffer(
317    deterministic: BackgroundExecutor,
318    cx_a: &mut TestAppContext,
319) {
320    let mut server = TestServer::start(deterministic.clone()).await;
321    let client_a = server.create_client(cx_a, "user_a").await;
322
323    let channel_id = server
324        .make_channel("the-channel", None, (&client_a, cx_a), &mut [])
325        .await;
326
327    let channel_buffer_1 = client_a
328        .channel_store()
329        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
330    let channel_buffer_2 = client_a
331        .channel_store()
332        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
333    let channel_buffer_3 = client_a
334        .channel_store()
335        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
336
337    // All concurrent tasks for opening a channel buffer return the same model handle.
338    let (channel_buffer, channel_buffer_2, channel_buffer_3) =
339        future::try_join3(channel_buffer_1, channel_buffer_2, channel_buffer_3)
340            .await
341            .unwrap();
342    let channel_buffer_model_id = channel_buffer.entity_id();
343    assert_eq!(channel_buffer, channel_buffer_2);
344    assert_eq!(channel_buffer, channel_buffer_3);
345
346    channel_buffer.update(cx_a, |buffer, cx| {
347        buffer.buffer().update(cx, |buffer, cx| {
348            buffer.edit([(0..0, "hello")], None, cx);
349        })
350    });
351    deterministic.run_until_parked();
352
353    cx_a.update(|_| {
354        drop(channel_buffer);
355        drop(channel_buffer_2);
356        drop(channel_buffer_3);
357    });
358    deterministic.run_until_parked();
359
360    // The channel buffer can be reopened after dropping it.
361    let channel_buffer = client_a
362        .channel_store()
363        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
364        .await
365        .unwrap();
366    assert_ne!(channel_buffer.entity_id(), channel_buffer_model_id);
367    channel_buffer.update(cx_a, |buffer, cx| {
368        buffer.buffer().update(cx, |buffer, _| {
369            assert_eq!(buffer.text(), "hello");
370        })
371    });
372}
373
374#[gpui::test]
375async fn test_channel_buffer_disconnect(
376    deterministic: BackgroundExecutor,
377    cx_a: &mut TestAppContext,
378    cx_b: &mut TestAppContext,
379) {
380    let mut server = TestServer::start(deterministic.clone()).await;
381    let client_a = server.create_client(cx_a, "user_a").await;
382    let client_b = server.create_client(cx_b, "user_b").await;
383
384    let channel_id = server
385        .make_channel(
386            "the-channel",
387            None,
388            (&client_a, cx_a),
389            &mut [(&client_b, cx_b)],
390        )
391        .await;
392
393    let channel_buffer_a = client_a
394        .channel_store()
395        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
396        .await
397        .unwrap();
398
399    let channel_buffer_b = client_b
400        .channel_store()
401        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
402        .await
403        .unwrap();
404
405    server.forbid_connections();
406    server.disconnect_client(client_a.peer_id().unwrap());
407    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
408
409    channel_buffer_a.update(cx_a, |buffer, cx| {
410        assert_eq!(buffer.channel(cx).unwrap().name, "the-channel");
411        assert!(!buffer.is_connected());
412    });
413
414    deterministic.run_until_parked();
415
416    server.allow_connections();
417    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
418
419    deterministic.run_until_parked();
420
421    client_a
422        .channel_store()
423        .update(cx_a, |channel_store, _| {
424            channel_store.remove_channel(channel_id)
425        })
426        .await
427        .unwrap();
428    deterministic.run_until_parked();
429
430    // Channel buffer observed the deletion
431    channel_buffer_b.update(cx_b, |buffer, cx| {
432        assert!(buffer.channel(cx).is_none());
433        assert!(!buffer.is_connected());
434    });
435}
436
437#[gpui::test]
438async fn test_rejoin_channel_buffer(
439    deterministic: BackgroundExecutor,
440    cx_a: &mut TestAppContext,
441    cx_b: &mut TestAppContext,
442) {
443    let mut server = TestServer::start(deterministic.clone()).await;
444    let client_a = server.create_client(cx_a, "user_a").await;
445    let client_b = server.create_client(cx_b, "user_b").await;
446
447    let channel_id = server
448        .make_channel(
449            "the-channel",
450            None,
451            (&client_a, cx_a),
452            &mut [(&client_b, cx_b)],
453        )
454        .await;
455
456    let channel_buffer_a = client_a
457        .channel_store()
458        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
459        .await
460        .unwrap();
461    let channel_buffer_b = client_b
462        .channel_store()
463        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
464        .await
465        .unwrap();
466
467    channel_buffer_a.update(cx_a, |buffer, cx| {
468        buffer.buffer().update(cx, |buffer, cx| {
469            buffer.edit([(0..0, "1")], None, cx);
470        })
471    });
472    deterministic.run_until_parked();
473
474    // Client A disconnects.
475    server.forbid_connections();
476    server.disconnect_client(client_a.peer_id().unwrap());
477
478    // Both clients make an edit.
479    channel_buffer_a.update(cx_a, |buffer, cx| {
480        buffer.buffer().update(cx, |buffer, cx| {
481            buffer.edit([(1..1, "2")], None, cx);
482        })
483    });
484    channel_buffer_b.update(cx_b, |buffer, cx| {
485        buffer.buffer().update(cx, |buffer, cx| {
486            buffer.edit([(0..0, "0")], None, cx);
487        })
488    });
489
490    // Both clients see their own edit.
491    deterministic.run_until_parked();
492    channel_buffer_a.read_with(cx_a, |buffer, cx| {
493        assert_eq!(buffer.buffer().read(cx).text(), "12");
494    });
495    channel_buffer_b.read_with(cx_b, |buffer, cx| {
496        assert_eq!(buffer.buffer().read(cx).text(), "01");
497    });
498
499    // Client A reconnects. Both clients see each other's edits, and see
500    // the same collaborators.
501    server.allow_connections();
502    deterministic.advance_clock(RECEIVE_TIMEOUT);
503    channel_buffer_a.read_with(cx_a, |buffer, cx| {
504        assert_eq!(buffer.buffer().read(cx).text(), "012");
505    });
506    channel_buffer_b.read_with(cx_b, |buffer, cx| {
507        assert_eq!(buffer.buffer().read(cx).text(), "012");
508    });
509
510    channel_buffer_a.read_with(cx_a, |buffer_a, _| {
511        channel_buffer_b.read_with(cx_b, |buffer_b, _| {
512            assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
513        });
514    });
515}
516
517#[gpui::test]
518async fn test_channel_buffers_and_server_restarts(
519    deterministic: BackgroundExecutor,
520    cx_a: &mut TestAppContext,
521    cx_b: &mut TestAppContext,
522    cx_c: &mut TestAppContext,
523) {
524    let mut server = TestServer::start(deterministic.clone()).await;
525    let client_a = server.create_client(cx_a, "user_a").await;
526    let client_b = server.create_client(cx_b, "user_b").await;
527    let client_c = server.create_client(cx_c, "user_c").await;
528
529    let channel_id = server
530        .make_channel(
531            "the-channel",
532            None,
533            (&client_a, cx_a),
534            &mut [(&client_b, cx_b), (&client_c, cx_c)],
535        )
536        .await;
537
538    let channel_buffer_a = client_a
539        .channel_store()
540        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
541        .await
542        .unwrap();
543    let channel_buffer_b = client_b
544        .channel_store()
545        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
546        .await
547        .unwrap();
548    let _channel_buffer_c = client_c
549        .channel_store()
550        .update(cx_c, |store, cx| store.open_channel_buffer(channel_id, cx))
551        .await
552        .unwrap();
553
554    channel_buffer_a.update(cx_a, |buffer, cx| {
555        buffer.buffer().update(cx, |buffer, cx| {
556            buffer.edit([(0..0, "1")], None, cx);
557        })
558    });
559    deterministic.run_until_parked();
560
561    // Client C can't reconnect.
562    client_c.override_establish_connection(|_, cx| cx.spawn(|_| future::pending()));
563
564    // Server stops.
565    server.reset().await;
566    deterministic.advance_clock(RECEIVE_TIMEOUT);
567
568    // While the server is down, both clients make an edit.
569    channel_buffer_a.update(cx_a, |buffer, cx| {
570        buffer.buffer().update(cx, |buffer, cx| {
571            buffer.edit([(1..1, "2")], None, cx);
572        })
573    });
574    channel_buffer_b.update(cx_b, |buffer, cx| {
575        buffer.buffer().update(cx, |buffer, cx| {
576            buffer.edit([(0..0, "0")], None, cx);
577        })
578    });
579
580    // Server restarts.
581    server.start().await.unwrap();
582    deterministic.advance_clock(CLEANUP_TIMEOUT);
583
584    // Clients reconnects. Clients A and B see each other's edits, and see
585    // that client C has disconnected.
586    channel_buffer_a.read_with(cx_a, |buffer, cx| {
587        assert_eq!(buffer.buffer().read(cx).text(), "012");
588    });
589    channel_buffer_b.read_with(cx_b, |buffer, cx| {
590        assert_eq!(buffer.buffer().read(cx).text(), "012");
591    });
592
593    channel_buffer_a.read_with(cx_a, |buffer_a, _| {
594        channel_buffer_b.read_with(cx_b, |buffer_b, _| {
595            assert_collaborators(
596                buffer_a.collaborators(),
597                &[client_a.user_id(), client_b.user_id()],
598            );
599            assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
600        });
601    });
602}
603
604#[gpui::test]
605async fn test_channel_buffer_changes(
606    deterministic: BackgroundExecutor,
607    cx_a: &mut TestAppContext,
608    cx_b: &mut TestAppContext,
609) {
610    let (server, client_a, client_b, channel_id) = TestServer::start2(cx_a, cx_b).await;
611    let (_, cx_a) = client_a.build_test_workspace(cx_a).await;
612    let (workspace_b, cx_b) = client_b.build_test_workspace(cx_b).await;
613    let channel_store_b = client_b.channel_store().clone();
614
615    // Editing the channel notes should set them to dirty
616    open_channel_notes(channel_id, cx_a).await.unwrap();
617    cx_a.simulate_keystrokes("1");
618    channel_store_b.read_with(cx_b, |channel_store, _| {
619        assert!(channel_store.has_channel_buffer_changed(channel_id))
620    });
621
622    // Opening the buffer should clear the changed flag.
623    open_channel_notes(channel_id, cx_b).await.unwrap();
624    channel_store_b.read_with(cx_b, |channel_store, _| {
625        assert!(!channel_store.has_channel_buffer_changed(channel_id))
626    });
627
628    // Editing the channel while the buffer is open should not show that the buffer has changed.
629    cx_a.simulate_keystrokes("2");
630    channel_store_b.read_with(cx_b, |channel_store, _| {
631        assert!(!channel_store.has_channel_buffer_changed(channel_id))
632    });
633
634    // Test that the server is tracking things correctly, and we retain our 'not changed'
635    // state across a disconnect
636    deterministic.advance_clock(ACKNOWLEDGE_DEBOUNCE_INTERVAL);
637    server
638        .simulate_long_connection_interruption(client_b.peer_id().unwrap(), deterministic.clone());
639    channel_store_b.read_with(cx_b, |channel_store, _| {
640        assert!(!channel_store.has_channel_buffer_changed(channel_id))
641    });
642
643    // Closing the buffer should re-enable change tracking
644    cx_b.update(|cx| {
645        workspace_b.update(cx, |workspace, cx| {
646            workspace.close_all_items_and_panes(&Default::default(), cx)
647        });
648    });
649    deterministic.run_until_parked();
650
651    cx_a.simulate_keystrokes("3");
652    channel_store_b.read_with(cx_b, |channel_store, _| {
653        assert!(channel_store.has_channel_buffer_changed(channel_id))
654    });
655}
656
657#[gpui::test]
658async fn test_channel_buffer_changes_persist(
659    cx_a: &mut TestAppContext,
660    cx_b: &mut TestAppContext,
661    cx_b2: &mut TestAppContext,
662) {
663    let (mut server, client_a, client_b, channel_id) = TestServer::start2(cx_a, cx_b).await;
664    let (_, cx_a) = client_a.build_test_workspace(cx_a).await;
665    let (_, cx_b) = client_b.build_test_workspace(cx_b).await;
666
667    // a) edits the notes
668    open_channel_notes(channel_id, cx_a).await.unwrap();
669    cx_a.simulate_keystrokes("1");
670    // b) opens them to observe the current version
671    open_channel_notes(channel_id, cx_b).await.unwrap();
672
673    // On boot the client should get the correct state.
674    let client_b2 = server.create_client(cx_b2, "user_b").await;
675    let channel_store_b2 = client_b2.channel_store().clone();
676    channel_store_b2.read_with(cx_b2, |channel_store, _| {
677        assert!(!channel_store.has_channel_buffer_changed(channel_id))
678    });
679}
680
681#[track_caller]
682fn assert_collaborators(collaborators: &HashMap<PeerId, Collaborator>, ids: &[Option<UserId>]) {
683    let mut user_ids = collaborators
684        .values()
685        .map(|collaborator| collaborator.user_id)
686        .collect::<Vec<_>>();
687    user_ids.sort();
688    assert_eq!(
689        user_ids,
690        ids.iter().map(|id| id.unwrap()).collect::<Vec<_>>()
691    );
692}
693
694fn buffer_text(channel_buffer: &Model<language::Buffer>, cx: &mut TestAppContext) -> String {
695    channel_buffer.read_with(cx, |buffer, _| buffer.text())
696}