channel_buffer_tests.rs

  1use crate::{
  2    rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT},
  3    tests::TestServer,
  4};
  5use call::ActiveCall;
  6use channel::{Channel, ACKNOWLEDGE_DEBOUNCE_INTERVAL};
  7use client::ParticipantIndex;
  8use client::{Collaborator, UserId};
  9use collab_ui::channel_view::ChannelView;
 10use collections::HashMap;
 11use editor::{Anchor, Editor, ToOffset};
 12use futures::future;
 13use gpui::{executor::Deterministic, ModelHandle, TestAppContext, ViewContext};
 14use rpc::{
 15    proto::{self, PeerId},
 16    RECEIVE_TIMEOUT,
 17};
 18use serde_json::json;
 19use std::{ops::Range, sync::Arc};
 20
 21#[gpui::test]
 22async fn test_core_channel_buffers(
 23    deterministic: Arc<Deterministic>,
 24    cx_a: &mut TestAppContext,
 25    cx_b: &mut TestAppContext,
 26) {
 27    deterministic.forbid_parking();
 28    let mut server = TestServer::start(&deterministic).await;
 29    let client_a = server.create_client(cx_a, "user_a").await;
 30    let client_b = server.create_client(cx_b, "user_b").await;
 31
 32    let channel_id = server
 33        .make_channel("zed", None, (&client_a, cx_a), &mut [(&client_b, cx_b)])
 34        .await;
 35
 36    // Client A joins the channel buffer
 37    let channel_buffer_a = client_a
 38        .channel_store()
 39        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
 40        .await
 41        .unwrap();
 42
 43    // Client A edits the buffer
 44    let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
 45    buffer_a.update(cx_a, |buffer, cx| {
 46        buffer.edit([(0..0, "hello world")], None, cx)
 47    });
 48    buffer_a.update(cx_a, |buffer, cx| {
 49        buffer.edit([(5..5, ", cruel")], None, cx)
 50    });
 51    buffer_a.update(cx_a, |buffer, cx| {
 52        buffer.edit([(0..5, "goodbye")], None, cx)
 53    });
 54    buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
 55    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
 56    deterministic.run_until_parked();
 57
 58    // Client B joins the channel buffer
 59    let channel_buffer_b = client_b
 60        .channel_store()
 61        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
 62        .await
 63        .unwrap();
 64    channel_buffer_b.read_with(cx_b, |buffer, _| {
 65        assert_collaborators(
 66            buffer.collaborators(),
 67            &[client_a.user_id(), client_b.user_id()],
 68        );
 69    });
 70
 71    // Client B sees the correct text, and then edits it
 72    let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
 73    assert_eq!(
 74        buffer_b.read_with(cx_b, |buffer, _| buffer.remote_id()),
 75        buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id())
 76    );
 77    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
 78    buffer_b.update(cx_b, |buffer, cx| {
 79        buffer.edit([(7..12, "beautiful")], None, cx)
 80    });
 81
 82    // Both A and B see the new edit
 83    deterministic.run_until_parked();
 84    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
 85    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
 86
 87    // Client A closes the channel buffer.
 88    cx_a.update(|_| drop(channel_buffer_a));
 89    deterministic.run_until_parked();
 90
 91    // Client B sees that client A is gone from the channel buffer.
 92    channel_buffer_b.read_with(cx_b, |buffer, _| {
 93        assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
 94    });
 95
 96    // Client A rejoins the channel buffer
 97    let _channel_buffer_a = client_a
 98        .channel_store()
 99        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
100        .await
101        .unwrap();
102    deterministic.run_until_parked();
103
104    // Sanity test, make sure we saw A rejoining
105    channel_buffer_b.read_with(cx_b, |buffer, _| {
106        assert_collaborators(
107            &buffer.collaborators(),
108            &[client_a.user_id(), client_b.user_id()],
109        );
110    });
111
112    // Client A loses connection.
113    server.forbid_connections();
114    server.disconnect_client(client_a.peer_id().unwrap());
115    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
116
117    // Client B observes A disconnect
118    channel_buffer_b.read_with(cx_b, |buffer, _| {
119        assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
120    });
121
122    // TODO:
123    // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
124    // - Test interaction with channel deletion while buffer is open
125}
126
127#[gpui::test]
128async fn test_channel_notes_participant_indices(
129    deterministic: Arc<Deterministic>,
130    mut cx_a: &mut TestAppContext,
131    mut cx_b: &mut TestAppContext,
132    cx_c: &mut TestAppContext,
133) {
134    deterministic.forbid_parking();
135    let mut server = TestServer::start(&deterministic).await;
136    let client_a = server.create_client(cx_a, "user_a").await;
137    let client_b = server.create_client(cx_b, "user_b").await;
138    let client_c = server.create_client(cx_c, "user_c").await;
139
140    let active_call_a = cx_a.read(ActiveCall::global);
141    let active_call_b = cx_b.read(ActiveCall::global);
142
143    cx_a.update(editor::init);
144    cx_b.update(editor::init);
145    cx_c.update(editor::init);
146
147    let channel_id = server
148        .make_channel(
149            "the-channel",
150            None,
151            (&client_a, cx_a),
152            &mut [(&client_b, cx_b), (&client_c, cx_c)],
153        )
154        .await;
155
156    client_a
157        .fs()
158        .insert_tree("/root", json!({"file.txt": "123"}))
159        .await;
160    let (project_a, worktree_id_a) = client_a.build_local_project("/root", cx_a).await;
161    let project_b = client_b.build_empty_local_project(cx_b);
162    let project_c = client_c.build_empty_local_project(cx_c);
163    let workspace_a = client_a.build_workspace(&project_a, cx_a).root(cx_a);
164    let workspace_b = client_b.build_workspace(&project_b, cx_b).root(cx_b);
165    let workspace_c = client_c.build_workspace(&project_c, cx_c).root(cx_c);
166
167    // Clients A, B, and C open the channel notes
168    let channel_view_a = cx_a
169        .update(|cx| ChannelView::open(channel_id, workspace_a.clone(), cx))
170        .await
171        .unwrap();
172    let channel_view_b = cx_b
173        .update(|cx| ChannelView::open(channel_id, workspace_b.clone(), cx))
174        .await
175        .unwrap();
176    let channel_view_c = cx_c
177        .update(|cx| ChannelView::open(channel_id, workspace_c.clone(), cx))
178        .await
179        .unwrap();
180
181    // Clients A, B, and C all insert and select some text
182    channel_view_a.update(cx_a, |notes, cx| {
183        notes.editor.update(cx, |editor, cx| {
184            editor.insert("a", cx);
185            editor.change_selections(None, cx, |selections| {
186                selections.select_ranges(vec![0..1]);
187            });
188        });
189    });
190    deterministic.run_until_parked();
191    channel_view_b.update(cx_b, |notes, cx| {
192        notes.editor.update(cx, |editor, cx| {
193            editor.move_down(&Default::default(), cx);
194            editor.insert("b", cx);
195            editor.change_selections(None, cx, |selections| {
196                selections.select_ranges(vec![1..2]);
197            });
198        });
199    });
200    deterministic.run_until_parked();
201    channel_view_c.update(cx_c, |notes, cx| {
202        notes.editor.update(cx, |editor, cx| {
203            editor.move_down(&Default::default(), cx);
204            editor.insert("c", cx);
205            editor.change_selections(None, cx, |selections| {
206                selections.select_ranges(vec![2..3]);
207            });
208        });
209    });
210
211    // Client A sees clients B and C without assigned colors, because they aren't
212    // in a call together.
213    deterministic.run_until_parked();
214    channel_view_a.update(cx_a, |notes, cx| {
215        notes.editor.update(cx, |editor, cx| {
216            assert_remote_selections(editor, &[(None, 1..2), (None, 2..3)], cx);
217        });
218    });
219
220    // Clients A and B join the same call.
221    for (call, cx) in [(&active_call_a, &mut cx_a), (&active_call_b, &mut cx_b)] {
222        call.update(*cx, |call, cx| call.join_channel(channel_id, cx))
223            .await
224            .unwrap();
225    }
226
227    // Clients A and B see each other with two different assigned colors. Client C
228    // still doesn't have a color.
229    deterministic.run_until_parked();
230    channel_view_a.update(cx_a, |notes, cx| {
231        notes.editor.update(cx, |editor, cx| {
232            assert_remote_selections(
233                editor,
234                &[(Some(ParticipantIndex(1)), 1..2), (None, 2..3)],
235                cx,
236            );
237        });
238    });
239    channel_view_b.update(cx_b, |notes, cx| {
240        notes.editor.update(cx, |editor, cx| {
241            assert_remote_selections(
242                editor,
243                &[(Some(ParticipantIndex(0)), 0..1), (None, 2..3)],
244                cx,
245            );
246        });
247    });
248
249    // Client A shares a project, and client B joins.
250    let project_id = active_call_a
251        .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
252        .await
253        .unwrap();
254    let project_b = client_b.build_remote_project(project_id, cx_b).await;
255    let workspace_b = client_b.build_workspace(&project_b, cx_b).root(cx_b);
256
257    // Clients A and B open the same file.
258    let editor_a = workspace_a
259        .update(cx_a, |workspace, cx| {
260            workspace.open_path((worktree_id_a, "file.txt"), None, true, cx)
261        })
262        .await
263        .unwrap()
264        .downcast::<Editor>()
265        .unwrap();
266    let editor_b = workspace_b
267        .update(cx_b, |workspace, cx| {
268            workspace.open_path((worktree_id_a, "file.txt"), None, true, cx)
269        })
270        .await
271        .unwrap()
272        .downcast::<Editor>()
273        .unwrap();
274
275    editor_a.update(cx_a, |editor, cx| {
276        editor.change_selections(None, cx, |selections| {
277            selections.select_ranges(vec![0..1]);
278        });
279    });
280    editor_b.update(cx_b, |editor, cx| {
281        editor.change_selections(None, cx, |selections| {
282            selections.select_ranges(vec![2..3]);
283        });
284    });
285    deterministic.run_until_parked();
286
287    // Clients A and B see each other with the same colors as in the channel notes.
288    editor_a.update(cx_a, |editor, cx| {
289        assert_remote_selections(editor, &[(Some(ParticipantIndex(1)), 2..3)], cx);
290    });
291    editor_b.update(cx_b, |editor, cx| {
292        assert_remote_selections(editor, &[(Some(ParticipantIndex(0)), 0..1)], cx);
293    });
294}
295
296#[track_caller]
297fn assert_remote_selections(
298    editor: &mut Editor,
299    expected_selections: &[(Option<ParticipantIndex>, Range<usize>)],
300    cx: &mut ViewContext<Editor>,
301) {
302    let snapshot = editor.snapshot(cx);
303    let range = Anchor::min()..Anchor::max();
304    let remote_selections = snapshot
305        .remote_selections_in_range(&range, editor.collaboration_hub().unwrap(), cx)
306        .map(|s| {
307            let start = s.selection.start.to_offset(&snapshot.buffer_snapshot);
308            let end = s.selection.end.to_offset(&snapshot.buffer_snapshot);
309            (s.participant_index, start..end)
310        })
311        .collect::<Vec<_>>();
312    assert_eq!(
313        remote_selections, expected_selections,
314        "incorrect remote selections"
315    );
316}
317
318#[gpui::test]
319async fn test_multiple_handles_to_channel_buffer(
320    deterministic: Arc<Deterministic>,
321    cx_a: &mut TestAppContext,
322) {
323    deterministic.forbid_parking();
324    let mut server = TestServer::start(&deterministic).await;
325    let client_a = server.create_client(cx_a, "user_a").await;
326
327    let channel_id = server
328        .make_channel("the-channel", None, (&client_a, cx_a), &mut [])
329        .await;
330
331    let channel_buffer_1 = client_a
332        .channel_store()
333        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
334    let channel_buffer_2 = client_a
335        .channel_store()
336        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
337    let channel_buffer_3 = client_a
338        .channel_store()
339        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
340
341    // All concurrent tasks for opening a channel buffer return the same model handle.
342    let (channel_buffer, channel_buffer_2, channel_buffer_3) =
343        future::try_join3(channel_buffer_1, channel_buffer_2, channel_buffer_3)
344            .await
345            .unwrap();
346    let channel_buffer_model_id = channel_buffer.id();
347    assert_eq!(channel_buffer, channel_buffer_2);
348    assert_eq!(channel_buffer, channel_buffer_3);
349
350    channel_buffer.update(cx_a, |buffer, cx| {
351        buffer.buffer().update(cx, |buffer, cx| {
352            buffer.edit([(0..0, "hello")], None, cx);
353        })
354    });
355    deterministic.run_until_parked();
356
357    cx_a.update(|_| {
358        drop(channel_buffer);
359        drop(channel_buffer_2);
360        drop(channel_buffer_3);
361    });
362    deterministic.run_until_parked();
363
364    // The channel buffer can be reopened after dropping it.
365    let channel_buffer = client_a
366        .channel_store()
367        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
368        .await
369        .unwrap();
370    assert_ne!(channel_buffer.id(), channel_buffer_model_id);
371    channel_buffer.update(cx_a, |buffer, cx| {
372        buffer.buffer().update(cx, |buffer, _| {
373            assert_eq!(buffer.text(), "hello");
374        })
375    });
376}
377
378#[gpui::test]
379async fn test_channel_buffer_disconnect(
380    deterministic: Arc<Deterministic>,
381    cx_a: &mut TestAppContext,
382    cx_b: &mut TestAppContext,
383) {
384    deterministic.forbid_parking();
385    let mut server = TestServer::start(&deterministic).await;
386    let client_a = server.create_client(cx_a, "user_a").await;
387    let client_b = server.create_client(cx_b, "user_b").await;
388
389    let channel_id = server
390        .make_channel(
391            "the-channel",
392            None,
393            (&client_a, cx_a),
394            &mut [(&client_b, cx_b)],
395        )
396        .await;
397
398    let channel_buffer_a = client_a
399        .channel_store()
400        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
401        .await
402        .unwrap();
403    let channel_buffer_b = client_b
404        .channel_store()
405        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
406        .await
407        .unwrap();
408
409    server.forbid_connections();
410    server.disconnect_client(client_a.peer_id().unwrap());
411    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
412
413    channel_buffer_a.update(cx_a, |buffer, _| {
414        assert_eq!(
415            buffer.channel().as_ref(),
416            &channel(channel_id, "the-channel", proto::ChannelRole::Admin)
417        );
418        assert!(!buffer.is_connected());
419    });
420
421    deterministic.run_until_parked();
422
423    server.allow_connections();
424    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
425
426    deterministic.run_until_parked();
427
428    client_a
429        .channel_store()
430        .update(cx_a, |channel_store, _| {
431            channel_store.remove_channel(channel_id)
432        })
433        .await
434        .unwrap();
435    deterministic.run_until_parked();
436
437    // Channel buffer observed the deletion
438    channel_buffer_b.update(cx_b, |buffer, _| {
439        assert_eq!(
440            buffer.channel().as_ref(),
441            &channel(channel_id, "the-channel", proto::ChannelRole::Member)
442        );
443        assert!(!buffer.is_connected());
444    });
445}
446
447fn channel(id: u64, name: &'static str, role: proto::ChannelRole) -> Channel {
448    Channel {
449        id,
450        role,
451        name: name.to_string(),
452        visibility: proto::ChannelVisibility::Members,
453        unseen_note_version: None,
454        unseen_message_id: None,
455    }
456}
457
458#[gpui::test]
459async fn test_rejoin_channel_buffer(
460    deterministic: Arc<Deterministic>,
461    cx_a: &mut TestAppContext,
462    cx_b: &mut TestAppContext,
463) {
464    deterministic.forbid_parking();
465    let mut server = TestServer::start(&deterministic).await;
466    let client_a = server.create_client(cx_a, "user_a").await;
467    let client_b = server.create_client(cx_b, "user_b").await;
468
469    let channel_id = server
470        .make_channel(
471            "the-channel",
472            None,
473            (&client_a, cx_a),
474            &mut [(&client_b, cx_b)],
475        )
476        .await;
477
478    let channel_buffer_a = client_a
479        .channel_store()
480        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
481        .await
482        .unwrap();
483    let channel_buffer_b = client_b
484        .channel_store()
485        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
486        .await
487        .unwrap();
488
489    channel_buffer_a.update(cx_a, |buffer, cx| {
490        buffer.buffer().update(cx, |buffer, cx| {
491            buffer.edit([(0..0, "1")], None, cx);
492        })
493    });
494    deterministic.run_until_parked();
495
496    // Client A disconnects.
497    server.forbid_connections();
498    server.disconnect_client(client_a.peer_id().unwrap());
499
500    // Both clients make an edit.
501    channel_buffer_a.update(cx_a, |buffer, cx| {
502        buffer.buffer().update(cx, |buffer, cx| {
503            buffer.edit([(1..1, "2")], None, cx);
504        })
505    });
506    channel_buffer_b.update(cx_b, |buffer, cx| {
507        buffer.buffer().update(cx, |buffer, cx| {
508            buffer.edit([(0..0, "0")], None, cx);
509        })
510    });
511
512    // Both clients see their own edit.
513    deterministic.run_until_parked();
514    channel_buffer_a.read_with(cx_a, |buffer, cx| {
515        assert_eq!(buffer.buffer().read(cx).text(), "12");
516    });
517    channel_buffer_b.read_with(cx_b, |buffer, cx| {
518        assert_eq!(buffer.buffer().read(cx).text(), "01");
519    });
520
521    // Client A reconnects. Both clients see each other's edits, and see
522    // the same collaborators.
523    server.allow_connections();
524    deterministic.advance_clock(RECEIVE_TIMEOUT);
525    channel_buffer_a.read_with(cx_a, |buffer, cx| {
526        assert_eq!(buffer.buffer().read(cx).text(), "012");
527    });
528    channel_buffer_b.read_with(cx_b, |buffer, cx| {
529        assert_eq!(buffer.buffer().read(cx).text(), "012");
530    });
531
532    channel_buffer_a.read_with(cx_a, |buffer_a, _| {
533        channel_buffer_b.read_with(cx_b, |buffer_b, _| {
534            assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
535        });
536    });
537}
538
539#[gpui::test]
540async fn test_channel_buffers_and_server_restarts(
541    deterministic: Arc<Deterministic>,
542    cx_a: &mut TestAppContext,
543    cx_b: &mut TestAppContext,
544    cx_c: &mut TestAppContext,
545) {
546    deterministic.forbid_parking();
547    let mut server = TestServer::start(&deterministic).await;
548    let client_a = server.create_client(cx_a, "user_a").await;
549    let client_b = server.create_client(cx_b, "user_b").await;
550    let client_c = server.create_client(cx_c, "user_c").await;
551
552    let channel_id = server
553        .make_channel(
554            "the-channel",
555            None,
556            (&client_a, cx_a),
557            &mut [(&client_b, cx_b), (&client_c, cx_c)],
558        )
559        .await;
560
561    let channel_buffer_a = client_a
562        .channel_store()
563        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
564        .await
565        .unwrap();
566    let channel_buffer_b = client_b
567        .channel_store()
568        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
569        .await
570        .unwrap();
571    let _channel_buffer_c = client_c
572        .channel_store()
573        .update(cx_c, |store, cx| store.open_channel_buffer(channel_id, cx))
574        .await
575        .unwrap();
576
577    channel_buffer_a.update(cx_a, |buffer, cx| {
578        buffer.buffer().update(cx, |buffer, cx| {
579            buffer.edit([(0..0, "1")], None, cx);
580        })
581    });
582    deterministic.run_until_parked();
583
584    // Client C can't reconnect.
585    client_c.override_establish_connection(|_, cx| cx.spawn(|_| future::pending()));
586
587    // Server stops.
588    server.reset().await;
589    deterministic.advance_clock(RECEIVE_TIMEOUT);
590
591    // While the server is down, both clients make an edit.
592    channel_buffer_a.update(cx_a, |buffer, cx| {
593        buffer.buffer().update(cx, |buffer, cx| {
594            buffer.edit([(1..1, "2")], None, cx);
595        })
596    });
597    channel_buffer_b.update(cx_b, |buffer, cx| {
598        buffer.buffer().update(cx, |buffer, cx| {
599            buffer.edit([(0..0, "0")], None, cx);
600        })
601    });
602
603    // Server restarts.
604    server.start().await.unwrap();
605    deterministic.advance_clock(CLEANUP_TIMEOUT);
606
607    // Clients reconnects. Clients A and B see each other's edits, and see
608    // that client C has disconnected.
609    channel_buffer_a.read_with(cx_a, |buffer, cx| {
610        assert_eq!(buffer.buffer().read(cx).text(), "012");
611    });
612    channel_buffer_b.read_with(cx_b, |buffer, cx| {
613        assert_eq!(buffer.buffer().read(cx).text(), "012");
614    });
615
616    channel_buffer_a.read_with(cx_a, |buffer_a, _| {
617        channel_buffer_b.read_with(cx_b, |buffer_b, _| {
618            assert_collaborators(
619                buffer_a.collaborators(),
620                &[client_a.user_id(), client_b.user_id()],
621            );
622            assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
623        });
624    });
625}
626
627#[gpui::test(iterations = 10)]
628async fn test_following_to_channel_notes_without_a_shared_project(
629    deterministic: Arc<Deterministic>,
630    mut cx_a: &mut TestAppContext,
631    mut cx_b: &mut TestAppContext,
632    mut cx_c: &mut TestAppContext,
633) {
634    deterministic.forbid_parking();
635    let mut server = TestServer::start(&deterministic).await;
636    let client_a = server.create_client(cx_a, "user_a").await;
637    let client_b = server.create_client(cx_b, "user_b").await;
638
639    let client_c = server.create_client(cx_c, "user_c").await;
640
641    cx_a.update(editor::init);
642    cx_b.update(editor::init);
643    cx_c.update(editor::init);
644    cx_a.update(collab_ui::channel_view::init);
645    cx_b.update(collab_ui::channel_view::init);
646    cx_c.update(collab_ui::channel_view::init);
647
648    let channel_1_id = server
649        .make_channel(
650            "channel-1",
651            None,
652            (&client_a, cx_a),
653            &mut [(&client_b, cx_b), (&client_c, cx_c)],
654        )
655        .await;
656    let channel_2_id = server
657        .make_channel(
658            "channel-2",
659            None,
660            (&client_a, cx_a),
661            &mut [(&client_b, cx_b), (&client_c, cx_c)],
662        )
663        .await;
664
665    // Clients A, B, and C join a channel.
666    let active_call_a = cx_a.read(ActiveCall::global);
667    let active_call_b = cx_b.read(ActiveCall::global);
668    let active_call_c = cx_c.read(ActiveCall::global);
669    for (call, cx) in [
670        (&active_call_a, &mut cx_a),
671        (&active_call_b, &mut cx_b),
672        (&active_call_c, &mut cx_c),
673    ] {
674        call.update(*cx, |call, cx| call.join_channel(channel_1_id, cx))
675            .await
676            .unwrap();
677    }
678    deterministic.run_until_parked();
679
680    // Clients A, B, and C all open their own unshared projects.
681    client_a.fs().insert_tree("/a", json!({})).await;
682    client_b.fs().insert_tree("/b", json!({})).await;
683    client_c.fs().insert_tree("/c", json!({})).await;
684    let (project_a, _) = client_a.build_local_project("/a", cx_a).await;
685    let (project_b, _) = client_b.build_local_project("/b", cx_b).await;
686    let (project_c, _) = client_b.build_local_project("/c", cx_c).await;
687    let workspace_a = client_a.build_workspace(&project_a, cx_a).root(cx_a);
688    let workspace_b = client_b.build_workspace(&project_b, cx_b).root(cx_b);
689    let _workspace_c = client_c.build_workspace(&project_c, cx_c).root(cx_c);
690
691    active_call_a
692        .update(cx_a, |call, cx| call.set_location(Some(&project_a), cx))
693        .await
694        .unwrap();
695
696    // Client A opens the notes for channel 1.
697    let channel_view_1_a = cx_a
698        .update(|cx| ChannelView::open(channel_1_id, workspace_a.clone(), cx))
699        .await
700        .unwrap();
701    channel_view_1_a.update(cx_a, |notes, cx| {
702        assert_eq!(notes.channel(cx).name, "channel-1");
703        notes.editor.update(cx, |editor, cx| {
704            editor.insert("Hello from A.", cx);
705            editor.change_selections(None, cx, |selections| {
706                selections.select_ranges(vec![3..4]);
707            });
708        });
709    });
710
711    // Client B follows client A.
712    workspace_b
713        .update(cx_b, |workspace, cx| {
714            workspace.follow(client_a.peer_id().unwrap(), cx).unwrap()
715        })
716        .await
717        .unwrap();
718
719    // Client B is taken to the notes for channel 1, with the same
720    // text selected as client A.
721    deterministic.run_until_parked();
722    let channel_view_1_b = workspace_b.read_with(cx_b, |workspace, cx| {
723        assert_eq!(
724            workspace.leader_for_pane(workspace.active_pane()),
725            Some(client_a.peer_id().unwrap())
726        );
727        workspace
728            .active_item(cx)
729            .expect("no active item")
730            .downcast::<ChannelView>()
731            .expect("active item is not a channel view")
732    });
733    channel_view_1_b.read_with(cx_b, |notes, cx| {
734        assert_eq!(notes.channel(cx).name, "channel-1");
735        let editor = notes.editor.read(cx);
736        assert_eq!(editor.text(cx), "Hello from A.");
737        assert_eq!(editor.selections.ranges::<usize>(cx), &[3..4]);
738    });
739
740    // Client A opens the notes for channel 2.
741    let channel_view_2_a = cx_a
742        .update(|cx| ChannelView::open(channel_2_id, workspace_a.clone(), cx))
743        .await
744        .unwrap();
745    channel_view_2_a.read_with(cx_a, |notes, cx| {
746        assert_eq!(notes.channel(cx).name, "channel-2");
747    });
748
749    // Client B is taken to the notes for channel 2.
750    deterministic.run_until_parked();
751    let channel_view_2_b = workspace_b.read_with(cx_b, |workspace, cx| {
752        assert_eq!(
753            workspace.leader_for_pane(workspace.active_pane()),
754            Some(client_a.peer_id().unwrap())
755        );
756        workspace
757            .active_item(cx)
758            .expect("no active item")
759            .downcast::<ChannelView>()
760            .expect("active item is not a channel view")
761    });
762    channel_view_2_b.read_with(cx_b, |notes, cx| {
763        assert_eq!(notes.channel(cx).name, "channel-2");
764    });
765}
766
767#[gpui::test]
768async fn test_channel_buffer_changes(
769    deterministic: Arc<Deterministic>,
770    cx_a: &mut TestAppContext,
771    cx_b: &mut TestAppContext,
772) {
773    deterministic.forbid_parking();
774    let mut server = TestServer::start(&deterministic).await;
775    let client_a = server.create_client(cx_a, "user_a").await;
776    let client_b = server.create_client(cx_b, "user_b").await;
777
778    let channel_id = server
779        .make_channel(
780            "the-channel",
781            None,
782            (&client_a, cx_a),
783            &mut [(&client_b, cx_b)],
784        )
785        .await;
786
787    let channel_buffer_a = client_a
788        .channel_store()
789        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
790        .await
791        .unwrap();
792
793    // Client A makes an edit, and client B should see that the note has changed.
794    channel_buffer_a.update(cx_a, |buffer, cx| {
795        buffer.buffer().update(cx, |buffer, cx| {
796            buffer.edit([(0..0, "1")], None, cx);
797        })
798    });
799    deterministic.run_until_parked();
800
801    let has_buffer_changed = cx_b.read(|cx| {
802        client_b
803            .channel_store()
804            .read(cx)
805            .has_channel_buffer_changed(channel_id)
806            .unwrap()
807    });
808    assert!(has_buffer_changed);
809
810    // Opening the buffer should clear the changed flag.
811    let project_b = client_b.build_empty_local_project(cx_b);
812    let workspace_b = client_b.build_workspace(&project_b, cx_b).root(cx_b);
813    let channel_view_b = cx_b
814        .update(|cx| ChannelView::open(channel_id, workspace_b.clone(), cx))
815        .await
816        .unwrap();
817    deterministic.run_until_parked();
818
819    let has_buffer_changed = cx_b.read(|cx| {
820        client_b
821            .channel_store()
822            .read(cx)
823            .has_channel_buffer_changed(channel_id)
824            .unwrap()
825    });
826    assert!(!has_buffer_changed);
827
828    // Editing the channel while the buffer is open should not show that the buffer has changed.
829    channel_buffer_a.update(cx_a, |buffer, cx| {
830        buffer.buffer().update(cx, |buffer, cx| {
831            buffer.edit([(0..0, "2")], None, cx);
832        })
833    });
834    deterministic.run_until_parked();
835
836    let has_buffer_changed = cx_b.read(|cx| {
837        client_b
838            .channel_store()
839            .read(cx)
840            .has_channel_buffer_changed(channel_id)
841            .unwrap()
842    });
843    assert!(!has_buffer_changed);
844
845    deterministic.advance_clock(ACKNOWLEDGE_DEBOUNCE_INTERVAL);
846
847    // Test that the server is tracking things correctly, and we retain our 'not changed'
848    // state across a disconnect
849    server.simulate_long_connection_interruption(client_b.peer_id().unwrap(), &deterministic);
850    let has_buffer_changed = cx_b.read(|cx| {
851        client_b
852            .channel_store()
853            .read(cx)
854            .has_channel_buffer_changed(channel_id)
855            .unwrap()
856    });
857    assert!(!has_buffer_changed);
858
859    // Closing the buffer should re-enable change tracking
860    cx_b.update(|cx| {
861        workspace_b.update(cx, |workspace, cx| {
862            workspace.close_all_items_and_panes(&Default::default(), cx)
863        });
864
865        drop(channel_view_b)
866    });
867
868    deterministic.run_until_parked();
869
870    channel_buffer_a.update(cx_a, |buffer, cx| {
871        buffer.buffer().update(cx, |buffer, cx| {
872            buffer.edit([(0..0, "3")], None, cx);
873        })
874    });
875    deterministic.run_until_parked();
876
877    let has_buffer_changed = cx_b.read(|cx| {
878        client_b
879            .channel_store()
880            .read(cx)
881            .has_channel_buffer_changed(channel_id)
882            .unwrap()
883    });
884    assert!(has_buffer_changed);
885}
886
887#[track_caller]
888fn assert_collaborators(collaborators: &HashMap<PeerId, Collaborator>, ids: &[Option<UserId>]) {
889    let mut user_ids = collaborators
890        .values()
891        .map(|collaborator| collaborator.user_id)
892        .collect::<Vec<_>>();
893    user_ids.sort();
894    assert_eq!(
895        user_ids,
896        ids.into_iter().map(|id| id.unwrap()).collect::<Vec<_>>()
897    );
898}
899
900fn buffer_text(channel_buffer: &ModelHandle<language::Buffer>, cx: &mut TestAppContext) -> String {
901    channel_buffer.read_with(cx, |buffer, _| buffer.text())
902}