channel_buffer_tests.rs

  1use crate::{
  2    rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT},
  3    tests::TestServer,
  4};
  5use call::ActiveCall;
  6use channel::{Channel, ACKNOWLEDGE_DEBOUNCE_INTERVAL};
  7use client::ParticipantIndex;
  8use client::{Collaborator, UserId};
  9use collab_ui::channel_view::ChannelView;
 10use collections::HashMap;
 11use editor::{Anchor, Editor, ToOffset};
 12use futures::future;
 13use gpui::{executor::Deterministic, ModelHandle, TestAppContext, ViewContext};
 14use rpc::{proto::PeerId, RECEIVE_TIMEOUT};
 15use serde_json::json;
 16use std::{ops::Range, sync::Arc};
 17
 18#[gpui::test]
 19async fn test_core_channel_buffers(
 20    deterministic: Arc<Deterministic>,
 21    cx_a: &mut TestAppContext,
 22    cx_b: &mut TestAppContext,
 23) {
 24    deterministic.forbid_parking();
 25    let mut server = TestServer::start(&deterministic).await;
 26    let client_a = server.create_client(cx_a, "user_a").await;
 27    let client_b = server.create_client(cx_b, "user_b").await;
 28
 29    let channel_id = server
 30        .make_channel("zed", None, (&client_a, cx_a), &mut [(&client_b, cx_b)])
 31        .await;
 32
 33    // Client A joins the channel buffer
 34    let channel_buffer_a = client_a
 35        .channel_store()
 36        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
 37        .await
 38        .unwrap();
 39
 40    // Client A edits the buffer
 41    let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
 42    buffer_a.update(cx_a, |buffer, cx| {
 43        buffer.edit([(0..0, "hello world")], None, cx)
 44    });
 45    buffer_a.update(cx_a, |buffer, cx| {
 46        buffer.edit([(5..5, ", cruel")], None, cx)
 47    });
 48    buffer_a.update(cx_a, |buffer, cx| {
 49        buffer.edit([(0..5, "goodbye")], None, cx)
 50    });
 51    buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
 52    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
 53    deterministic.run_until_parked();
 54
 55    // Client B joins the channel buffer
 56    let channel_buffer_b = client_b
 57        .channel_store()
 58        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
 59        .await
 60        .unwrap();
 61    channel_buffer_b.read_with(cx_b, |buffer, _| {
 62        assert_collaborators(
 63            buffer.collaborators(),
 64            &[client_a.user_id(), client_b.user_id()],
 65        );
 66    });
 67
 68    // Client B sees the correct text, and then edits it
 69    let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
 70    assert_eq!(
 71        buffer_b.read_with(cx_b, |buffer, _| buffer.remote_id()),
 72        buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id())
 73    );
 74    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
 75    buffer_b.update(cx_b, |buffer, cx| {
 76        buffer.edit([(7..12, "beautiful")], None, cx)
 77    });
 78
 79    // Both A and B see the new edit
 80    deterministic.run_until_parked();
 81    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
 82    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
 83
 84    // Client A closes the channel buffer.
 85    cx_a.update(|_| drop(channel_buffer_a));
 86    deterministic.run_until_parked();
 87
 88    // Client B sees that client A is gone from the channel buffer.
 89    channel_buffer_b.read_with(cx_b, |buffer, _| {
 90        assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
 91    });
 92
 93    // Client A rejoins the channel buffer
 94    let _channel_buffer_a = client_a
 95        .channel_store()
 96        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
 97        .await
 98        .unwrap();
 99    deterministic.run_until_parked();
100
101    // Sanity test, make sure we saw A rejoining
102    channel_buffer_b.read_with(cx_b, |buffer, _| {
103        assert_collaborators(
104            &buffer.collaborators(),
105            &[client_a.user_id(), client_b.user_id()],
106        );
107    });
108
109    // Client A loses connection.
110    server.forbid_connections();
111    server.disconnect_client(client_a.peer_id().unwrap());
112    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
113
114    // Client B observes A disconnect
115    channel_buffer_b.read_with(cx_b, |buffer, _| {
116        assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
117    });
118
119    // TODO:
120    // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
121    // - Test interaction with channel deletion while buffer is open
122}
123
124#[gpui::test]
125async fn test_channel_notes_participant_indices(
126    deterministic: Arc<Deterministic>,
127    mut cx_a: &mut TestAppContext,
128    mut cx_b: &mut TestAppContext,
129    cx_c: &mut TestAppContext,
130) {
131    deterministic.forbid_parking();
132    let mut server = TestServer::start(&deterministic).await;
133    let client_a = server.create_client(cx_a, "user_a").await;
134    let client_b = server.create_client(cx_b, "user_b").await;
135    let client_c = server.create_client(cx_c, "user_c").await;
136
137    let active_call_a = cx_a.read(ActiveCall::global);
138    let active_call_b = cx_b.read(ActiveCall::global);
139
140    cx_a.update(editor::init);
141    cx_b.update(editor::init);
142    cx_c.update(editor::init);
143
144    let channel_id = server
145        .make_channel(
146            "the-channel",
147            None,
148            (&client_a, cx_a),
149            &mut [(&client_b, cx_b), (&client_c, cx_c)],
150        )
151        .await;
152
153    client_a
154        .fs()
155        .insert_tree("/root", json!({"file.txt": "123"}))
156        .await;
157    let (project_a, worktree_id_a) = client_a.build_local_project("/root", cx_a).await;
158    let project_b = client_b.build_empty_local_project(cx_b);
159    let project_c = client_c.build_empty_local_project(cx_c);
160    let workspace_a = client_a.build_workspace(&project_a, cx_a).root(cx_a);
161    let workspace_b = client_b.build_workspace(&project_b, cx_b).root(cx_b);
162    let workspace_c = client_c.build_workspace(&project_c, cx_c).root(cx_c);
163
164    // Clients A, B, and C open the channel notes
165    let channel_view_a = cx_a
166        .update(|cx| ChannelView::open(channel_id, workspace_a.clone(), cx))
167        .await
168        .unwrap();
169    let channel_view_b = cx_b
170        .update(|cx| ChannelView::open(channel_id, workspace_b.clone(), cx))
171        .await
172        .unwrap();
173    let channel_view_c = cx_c
174        .update(|cx| ChannelView::open(channel_id, workspace_c.clone(), cx))
175        .await
176        .unwrap();
177
178    // Clients A, B, and C all insert and select some text
179    channel_view_a.update(cx_a, |notes, cx| {
180        notes.editor.update(cx, |editor, cx| {
181            editor.insert("a", cx);
182            editor.change_selections(None, cx, |selections| {
183                selections.select_ranges(vec![0..1]);
184            });
185        });
186    });
187    deterministic.run_until_parked();
188    channel_view_b.update(cx_b, |notes, cx| {
189        notes.editor.update(cx, |editor, cx| {
190            editor.move_down(&Default::default(), cx);
191            editor.insert("b", cx);
192            editor.change_selections(None, cx, |selections| {
193                selections.select_ranges(vec![1..2]);
194            });
195        });
196    });
197    deterministic.run_until_parked();
198    channel_view_c.update(cx_c, |notes, cx| {
199        notes.editor.update(cx, |editor, cx| {
200            editor.move_down(&Default::default(), cx);
201            editor.insert("c", cx);
202            editor.change_selections(None, cx, |selections| {
203                selections.select_ranges(vec![2..3]);
204            });
205        });
206    });
207
208    // Client A sees clients B and C without assigned colors, because they aren't
209    // in a call together.
210    deterministic.run_until_parked();
211    channel_view_a.update(cx_a, |notes, cx| {
212        notes.editor.update(cx, |editor, cx| {
213            assert_remote_selections(editor, &[(None, 1..2), (None, 2..3)], cx);
214        });
215    });
216
217    // Clients A and B join the same call.
218    for (call, cx) in [(&active_call_a, &mut cx_a), (&active_call_b, &mut cx_b)] {
219        call.update(*cx, |call, cx| call.join_channel(channel_id, cx))
220            .await
221            .unwrap();
222    }
223
224    // Clients A and B see each other with two different assigned colors. Client C
225    // still doesn't have a color.
226    deterministic.run_until_parked();
227    channel_view_a.update(cx_a, |notes, cx| {
228        notes.editor.update(cx, |editor, cx| {
229            assert_remote_selections(
230                editor,
231                &[(Some(ParticipantIndex(1)), 1..2), (None, 2..3)],
232                cx,
233            );
234        });
235    });
236    channel_view_b.update(cx_b, |notes, cx| {
237        notes.editor.update(cx, |editor, cx| {
238            assert_remote_selections(
239                editor,
240                &[(Some(ParticipantIndex(0)), 0..1), (None, 2..3)],
241                cx,
242            );
243        });
244    });
245
246    // Client A shares a project, and client B joins.
247    let project_id = active_call_a
248        .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
249        .await
250        .unwrap();
251    let project_b = client_b.build_remote_project(project_id, cx_b).await;
252    let workspace_b = client_b.build_workspace(&project_b, cx_b).root(cx_b);
253
254    // Clients A and B open the same file.
255    let editor_a = workspace_a
256        .update(cx_a, |workspace, cx| {
257            workspace.open_path((worktree_id_a, "file.txt"), None, true, cx)
258        })
259        .await
260        .unwrap()
261        .downcast::<Editor>()
262        .unwrap();
263    let editor_b = workspace_b
264        .update(cx_b, |workspace, cx| {
265            workspace.open_path((worktree_id_a, "file.txt"), None, true, cx)
266        })
267        .await
268        .unwrap()
269        .downcast::<Editor>()
270        .unwrap();
271
272    editor_a.update(cx_a, |editor, cx| {
273        editor.change_selections(None, cx, |selections| {
274            selections.select_ranges(vec![0..1]);
275        });
276    });
277    editor_b.update(cx_b, |editor, cx| {
278        editor.change_selections(None, cx, |selections| {
279            selections.select_ranges(vec![2..3]);
280        });
281    });
282    deterministic.run_until_parked();
283
284    // Clients A and B see each other with the same colors as in the channel notes.
285    editor_a.update(cx_a, |editor, cx| {
286        assert_remote_selections(editor, &[(Some(ParticipantIndex(1)), 2..3)], cx);
287    });
288    editor_b.update(cx_b, |editor, cx| {
289        assert_remote_selections(editor, &[(Some(ParticipantIndex(0)), 0..1)], cx);
290    });
291}
292
293#[track_caller]
294fn assert_remote_selections(
295    editor: &mut Editor,
296    expected_selections: &[(Option<ParticipantIndex>, Range<usize>)],
297    cx: &mut ViewContext<Editor>,
298) {
299    let snapshot = editor.snapshot(cx);
300    let range = Anchor::min()..Anchor::max();
301    let remote_selections = snapshot
302        .remote_selections_in_range(&range, editor.collaboration_hub().unwrap(), cx)
303        .map(|s| {
304            let start = s.selection.start.to_offset(&snapshot.buffer_snapshot);
305            let end = s.selection.end.to_offset(&snapshot.buffer_snapshot);
306            (s.participant_index, start..end)
307        })
308        .collect::<Vec<_>>();
309    assert_eq!(
310        remote_selections, expected_selections,
311        "incorrect remote selections"
312    );
313}
314
315#[gpui::test]
316async fn test_multiple_handles_to_channel_buffer(
317    deterministic: Arc<Deterministic>,
318    cx_a: &mut TestAppContext,
319) {
320    deterministic.forbid_parking();
321    let mut server = TestServer::start(&deterministic).await;
322    let client_a = server.create_client(cx_a, "user_a").await;
323
324    let channel_id = server
325        .make_channel("the-channel", None, (&client_a, cx_a), &mut [])
326        .await;
327
328    let channel_buffer_1 = client_a
329        .channel_store()
330        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
331    let channel_buffer_2 = client_a
332        .channel_store()
333        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
334    let channel_buffer_3 = client_a
335        .channel_store()
336        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
337
338    // All concurrent tasks for opening a channel buffer return the same model handle.
339    let (channel_buffer, channel_buffer_2, channel_buffer_3) =
340        future::try_join3(channel_buffer_1, channel_buffer_2, channel_buffer_3)
341            .await
342            .unwrap();
343    let channel_buffer_model_id = channel_buffer.id();
344    assert_eq!(channel_buffer, channel_buffer_2);
345    assert_eq!(channel_buffer, channel_buffer_3);
346
347    channel_buffer.update(cx_a, |buffer, cx| {
348        buffer.buffer().update(cx, |buffer, cx| {
349            buffer.edit([(0..0, "hello")], None, cx);
350        })
351    });
352    deterministic.run_until_parked();
353
354    cx_a.update(|_| {
355        drop(channel_buffer);
356        drop(channel_buffer_2);
357        drop(channel_buffer_3);
358    });
359    deterministic.run_until_parked();
360
361    // The channel buffer can be reopened after dropping it.
362    let channel_buffer = client_a
363        .channel_store()
364        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
365        .await
366        .unwrap();
367    assert_ne!(channel_buffer.id(), channel_buffer_model_id);
368    channel_buffer.update(cx_a, |buffer, cx| {
369        buffer.buffer().update(cx, |buffer, _| {
370            assert_eq!(buffer.text(), "hello");
371        })
372    });
373}
374
375#[gpui::test]
376async fn test_channel_buffer_disconnect(
377    deterministic: Arc<Deterministic>,
378    cx_a: &mut TestAppContext,
379    cx_b: &mut TestAppContext,
380) {
381    deterministic.forbid_parking();
382    let mut server = TestServer::start(&deterministic).await;
383    let client_a = server.create_client(cx_a, "user_a").await;
384    let client_b = server.create_client(cx_b, "user_b").await;
385
386    let channel_id = server
387        .make_channel(
388            "the-channel",
389            None,
390            (&client_a, cx_a),
391            &mut [(&client_b, cx_b)],
392        )
393        .await;
394
395    let channel_buffer_a = client_a
396        .channel_store()
397        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
398        .await
399        .unwrap();
400    let channel_buffer_b = client_b
401        .channel_store()
402        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
403        .await
404        .unwrap();
405
406    server.forbid_connections();
407    server.disconnect_client(client_a.peer_id().unwrap());
408    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
409
410    channel_buffer_a.update(cx_a, |buffer, _| {
411        assert_eq!(
412            buffer.channel().as_ref(),
413            &channel(channel_id, "the-channel")
414        );
415        assert!(!buffer.is_connected());
416    });
417
418    deterministic.run_until_parked();
419
420    server.allow_connections();
421    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
422
423    deterministic.run_until_parked();
424
425    client_a
426        .channel_store()
427        .update(cx_a, |channel_store, _| {
428            channel_store.remove_channel(channel_id)
429        })
430        .await
431        .unwrap();
432    deterministic.run_until_parked();
433
434    // Channel buffer observed the deletion
435    channel_buffer_b.update(cx_b, |buffer, _| {
436        assert_eq!(
437            buffer.channel().as_ref(),
438            &channel(channel_id, "the-channel")
439        );
440        assert!(!buffer.is_connected());
441    });
442}
443
444fn channel(id: u64, name: &'static str) -> Channel {
445    Channel {
446        id,
447        name: name.to_string(),
448        unseen_note_version: None,
449        unseen_message_id: None,
450    }
451}
452
453#[gpui::test]
454async fn test_rejoin_channel_buffer(
455    deterministic: Arc<Deterministic>,
456    cx_a: &mut TestAppContext,
457    cx_b: &mut TestAppContext,
458) {
459    deterministic.forbid_parking();
460    let mut server = TestServer::start(&deterministic).await;
461    let client_a = server.create_client(cx_a, "user_a").await;
462    let client_b = server.create_client(cx_b, "user_b").await;
463
464    let channel_id = server
465        .make_channel(
466            "the-channel",
467            None,
468            (&client_a, cx_a),
469            &mut [(&client_b, cx_b)],
470        )
471        .await;
472
473    let channel_buffer_a = client_a
474        .channel_store()
475        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
476        .await
477        .unwrap();
478    let channel_buffer_b = client_b
479        .channel_store()
480        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
481        .await
482        .unwrap();
483
484    channel_buffer_a.update(cx_a, |buffer, cx| {
485        buffer.buffer().update(cx, |buffer, cx| {
486            buffer.edit([(0..0, "1")], None, cx);
487        })
488    });
489    deterministic.run_until_parked();
490
491    // Client A disconnects.
492    server.forbid_connections();
493    server.disconnect_client(client_a.peer_id().unwrap());
494
495    // Both clients make an edit.
496    channel_buffer_a.update(cx_a, |buffer, cx| {
497        buffer.buffer().update(cx, |buffer, cx| {
498            buffer.edit([(1..1, "2")], None, cx);
499        })
500    });
501    channel_buffer_b.update(cx_b, |buffer, cx| {
502        buffer.buffer().update(cx, |buffer, cx| {
503            buffer.edit([(0..0, "0")], None, cx);
504        })
505    });
506
507    // Both clients see their own edit.
508    deterministic.run_until_parked();
509    channel_buffer_a.read_with(cx_a, |buffer, cx| {
510        assert_eq!(buffer.buffer().read(cx).text(), "12");
511    });
512    channel_buffer_b.read_with(cx_b, |buffer, cx| {
513        assert_eq!(buffer.buffer().read(cx).text(), "01");
514    });
515
516    // Client A reconnects. Both clients see each other's edits, and see
517    // the same collaborators.
518    server.allow_connections();
519    deterministic.advance_clock(RECEIVE_TIMEOUT);
520    channel_buffer_a.read_with(cx_a, |buffer, cx| {
521        assert_eq!(buffer.buffer().read(cx).text(), "012");
522    });
523    channel_buffer_b.read_with(cx_b, |buffer, cx| {
524        assert_eq!(buffer.buffer().read(cx).text(), "012");
525    });
526
527    channel_buffer_a.read_with(cx_a, |buffer_a, _| {
528        channel_buffer_b.read_with(cx_b, |buffer_b, _| {
529            assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
530        });
531    });
532}
533
534#[gpui::test]
535async fn test_channel_buffers_and_server_restarts(
536    deterministic: Arc<Deterministic>,
537    cx_a: &mut TestAppContext,
538    cx_b: &mut TestAppContext,
539    cx_c: &mut TestAppContext,
540) {
541    deterministic.forbid_parking();
542    let mut server = TestServer::start(&deterministic).await;
543    let client_a = server.create_client(cx_a, "user_a").await;
544    let client_b = server.create_client(cx_b, "user_b").await;
545    let client_c = server.create_client(cx_c, "user_c").await;
546
547    let channel_id = server
548        .make_channel(
549            "the-channel",
550            None,
551            (&client_a, cx_a),
552            &mut [(&client_b, cx_b), (&client_c, cx_c)],
553        )
554        .await;
555
556    let channel_buffer_a = client_a
557        .channel_store()
558        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
559        .await
560        .unwrap();
561    let channel_buffer_b = client_b
562        .channel_store()
563        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
564        .await
565        .unwrap();
566    let _channel_buffer_c = client_c
567        .channel_store()
568        .update(cx_c, |store, cx| store.open_channel_buffer(channel_id, cx))
569        .await
570        .unwrap();
571
572    channel_buffer_a.update(cx_a, |buffer, cx| {
573        buffer.buffer().update(cx, |buffer, cx| {
574            buffer.edit([(0..0, "1")], None, cx);
575        })
576    });
577    deterministic.run_until_parked();
578
579    // Client C can't reconnect.
580    client_c.override_establish_connection(|_, cx| cx.spawn(|_| future::pending()));
581
582    // Server stops.
583    server.reset().await;
584    deterministic.advance_clock(RECEIVE_TIMEOUT);
585
586    // While the server is down, both clients make an edit.
587    channel_buffer_a.update(cx_a, |buffer, cx| {
588        buffer.buffer().update(cx, |buffer, cx| {
589            buffer.edit([(1..1, "2")], None, cx);
590        })
591    });
592    channel_buffer_b.update(cx_b, |buffer, cx| {
593        buffer.buffer().update(cx, |buffer, cx| {
594            buffer.edit([(0..0, "0")], None, cx);
595        })
596    });
597
598    // Server restarts.
599    server.start().await.unwrap();
600    deterministic.advance_clock(CLEANUP_TIMEOUT);
601
602    // Clients reconnects. Clients A and B see each other's edits, and see
603    // that client C has disconnected.
604    channel_buffer_a.read_with(cx_a, |buffer, cx| {
605        assert_eq!(buffer.buffer().read(cx).text(), "012");
606    });
607    channel_buffer_b.read_with(cx_b, |buffer, cx| {
608        assert_eq!(buffer.buffer().read(cx).text(), "012");
609    });
610
611    channel_buffer_a.read_with(cx_a, |buffer_a, _| {
612        channel_buffer_b.read_with(cx_b, |buffer_b, _| {
613            assert_collaborators(
614                buffer_a.collaborators(),
615                &[client_a.user_id(), client_b.user_id()],
616            );
617            assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
618        });
619    });
620}
621
622#[gpui::test(iterations = 10)]
623async fn test_following_to_channel_notes_without_a_shared_project(
624    deterministic: Arc<Deterministic>,
625    mut cx_a: &mut TestAppContext,
626    mut cx_b: &mut TestAppContext,
627    mut cx_c: &mut TestAppContext,
628) {
629    deterministic.forbid_parking();
630    let mut server = TestServer::start(&deterministic).await;
631    let client_a = server.create_client(cx_a, "user_a").await;
632    let client_b = server.create_client(cx_b, "user_b").await;
633
634    let client_c = server.create_client(cx_c, "user_c").await;
635
636    cx_a.update(editor::init);
637    cx_b.update(editor::init);
638    cx_c.update(editor::init);
639    cx_a.update(collab_ui::channel_view::init);
640    cx_b.update(collab_ui::channel_view::init);
641    cx_c.update(collab_ui::channel_view::init);
642
643    let channel_1_id = server
644        .make_channel(
645            "channel-1",
646            None,
647            (&client_a, cx_a),
648            &mut [(&client_b, cx_b), (&client_c, cx_c)],
649        )
650        .await;
651    let channel_2_id = server
652        .make_channel(
653            "channel-2",
654            None,
655            (&client_a, cx_a),
656            &mut [(&client_b, cx_b), (&client_c, cx_c)],
657        )
658        .await;
659
660    // Clients A, B, and C join a channel.
661    let active_call_a = cx_a.read(ActiveCall::global);
662    let active_call_b = cx_b.read(ActiveCall::global);
663    let active_call_c = cx_c.read(ActiveCall::global);
664    for (call, cx) in [
665        (&active_call_a, &mut cx_a),
666        (&active_call_b, &mut cx_b),
667        (&active_call_c, &mut cx_c),
668    ] {
669        call.update(*cx, |call, cx| call.join_channel(channel_1_id, cx))
670            .await
671            .unwrap();
672    }
673    deterministic.run_until_parked();
674
675    // Clients A, B, and C all open their own unshared projects.
676    client_a.fs().insert_tree("/a", json!({})).await;
677    client_b.fs().insert_tree("/b", json!({})).await;
678    client_c.fs().insert_tree("/c", json!({})).await;
679    let (project_a, _) = client_a.build_local_project("/a", cx_a).await;
680    let (project_b, _) = client_b.build_local_project("/b", cx_b).await;
681    let (project_c, _) = client_b.build_local_project("/c", cx_c).await;
682    let workspace_a = client_a.build_workspace(&project_a, cx_a).root(cx_a);
683    let workspace_b = client_b.build_workspace(&project_b, cx_b).root(cx_b);
684    let _workspace_c = client_c.build_workspace(&project_c, cx_c).root(cx_c);
685
686    active_call_a
687        .update(cx_a, |call, cx| call.set_location(Some(&project_a), cx))
688        .await
689        .unwrap();
690
691    // Client A opens the notes for channel 1.
692    let channel_view_1_a = cx_a
693        .update(|cx| ChannelView::open(channel_1_id, workspace_a.clone(), cx))
694        .await
695        .unwrap();
696    channel_view_1_a.update(cx_a, |notes, cx| {
697        assert_eq!(notes.channel(cx).name, "channel-1");
698        notes.editor.update(cx, |editor, cx| {
699            editor.insert("Hello from A.", cx);
700            editor.change_selections(None, cx, |selections| {
701                selections.select_ranges(vec![3..4]);
702            });
703        });
704    });
705
706    // Client B follows client A.
707    workspace_b
708        .update(cx_b, |workspace, cx| {
709            workspace.follow(client_a.peer_id().unwrap(), cx).unwrap()
710        })
711        .await
712        .unwrap();
713
714    // Client B is taken to the notes for channel 1, with the same
715    // text selected as client A.
716    deterministic.run_until_parked();
717    let channel_view_1_b = workspace_b.read_with(cx_b, |workspace, cx| {
718        assert_eq!(
719            workspace.leader_for_pane(workspace.active_pane()),
720            Some(client_a.peer_id().unwrap())
721        );
722        workspace
723            .active_item(cx)
724            .expect("no active item")
725            .downcast::<ChannelView>()
726            .expect("active item is not a channel view")
727    });
728    channel_view_1_b.read_with(cx_b, |notes, cx| {
729        assert_eq!(notes.channel(cx).name, "channel-1");
730        let editor = notes.editor.read(cx);
731        assert_eq!(editor.text(cx), "Hello from A.");
732        assert_eq!(editor.selections.ranges::<usize>(cx), &[3..4]);
733    });
734
735    // Client A opens the notes for channel 2.
736    let channel_view_2_a = cx_a
737        .update(|cx| ChannelView::open(channel_2_id, workspace_a.clone(), cx))
738        .await
739        .unwrap();
740    channel_view_2_a.read_with(cx_a, |notes, cx| {
741        assert_eq!(notes.channel(cx).name, "channel-2");
742    });
743
744    // Client B is taken to the notes for channel 2.
745    deterministic.run_until_parked();
746    let channel_view_2_b = workspace_b.read_with(cx_b, |workspace, cx| {
747        assert_eq!(
748            workspace.leader_for_pane(workspace.active_pane()),
749            Some(client_a.peer_id().unwrap())
750        );
751        workspace
752            .active_item(cx)
753            .expect("no active item")
754            .downcast::<ChannelView>()
755            .expect("active item is not a channel view")
756    });
757    channel_view_2_b.read_with(cx_b, |notes, cx| {
758        assert_eq!(notes.channel(cx).name, "channel-2");
759    });
760}
761
762#[gpui::test]
763async fn test_channel_buffer_changes(
764    deterministic: Arc<Deterministic>,
765    cx_a: &mut TestAppContext,
766    cx_b: &mut TestAppContext,
767) {
768    deterministic.forbid_parking();
769    let mut server = TestServer::start(&deterministic).await;
770    let client_a = server.create_client(cx_a, "user_a").await;
771    let client_b = server.create_client(cx_b, "user_b").await;
772
773    let channel_id = server
774        .make_channel(
775            "the-channel",
776            None,
777            (&client_a, cx_a),
778            &mut [(&client_b, cx_b)],
779        )
780        .await;
781
782    let channel_buffer_a = client_a
783        .channel_store()
784        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
785        .await
786        .unwrap();
787
788    // Client A makes an edit, and client B should see that the note has changed.
789    channel_buffer_a.update(cx_a, |buffer, cx| {
790        buffer.buffer().update(cx, |buffer, cx| {
791            buffer.edit([(0..0, "1")], None, cx);
792        })
793    });
794    deterministic.run_until_parked();
795
796    let has_buffer_changed = cx_b.read(|cx| {
797        client_b
798            .channel_store()
799            .read(cx)
800            .has_channel_buffer_changed(channel_id)
801            .unwrap()
802    });
803    assert!(has_buffer_changed);
804
805    // Opening the buffer should clear the changed flag.
806    let project_b = client_b.build_empty_local_project(cx_b);
807    let workspace_b = client_b.build_workspace(&project_b, cx_b).root(cx_b);
808    let channel_view_b = cx_b
809        .update(|cx| ChannelView::open(channel_id, workspace_b.clone(), cx))
810        .await
811        .unwrap();
812    deterministic.run_until_parked();
813
814    let has_buffer_changed = cx_b.read(|cx| {
815        client_b
816            .channel_store()
817            .read(cx)
818            .has_channel_buffer_changed(channel_id)
819            .unwrap()
820    });
821    assert!(!has_buffer_changed);
822
823    // Editing the channel while the buffer is open should not show that the buffer has changed.
824    channel_buffer_a.update(cx_a, |buffer, cx| {
825        buffer.buffer().update(cx, |buffer, cx| {
826            buffer.edit([(0..0, "2")], None, cx);
827        })
828    });
829    deterministic.run_until_parked();
830
831    let has_buffer_changed = cx_b.read(|cx| {
832        client_b
833            .channel_store()
834            .read(cx)
835            .has_channel_buffer_changed(channel_id)
836            .unwrap()
837    });
838    assert!(!has_buffer_changed);
839
840    deterministic.advance_clock(ACKNOWLEDGE_DEBOUNCE_INTERVAL);
841
842    // Test that the server is tracking things correctly, and we retain our 'not changed'
843    // state across a disconnect
844    server.simulate_long_connection_interruption(client_b.peer_id().unwrap(), &deterministic);
845    let has_buffer_changed = cx_b.read(|cx| {
846        client_b
847            .channel_store()
848            .read(cx)
849            .has_channel_buffer_changed(channel_id)
850            .unwrap()
851    });
852    assert!(!has_buffer_changed);
853
854    // Closing the buffer should re-enable change tracking
855    cx_b.update(|cx| {
856        workspace_b.update(cx, |workspace, cx| {
857            workspace.close_all_items_and_panes(&Default::default(), cx)
858        });
859
860        drop(channel_view_b)
861    });
862
863    deterministic.run_until_parked();
864
865    channel_buffer_a.update(cx_a, |buffer, cx| {
866        buffer.buffer().update(cx, |buffer, cx| {
867            buffer.edit([(0..0, "3")], None, cx);
868        })
869    });
870    deterministic.run_until_parked();
871
872    let has_buffer_changed = cx_b.read(|cx| {
873        client_b
874            .channel_store()
875            .read(cx)
876            .has_channel_buffer_changed(channel_id)
877            .unwrap()
878    });
879    assert!(has_buffer_changed);
880}
881
882#[track_caller]
883fn assert_collaborators(collaborators: &HashMap<PeerId, Collaborator>, ids: &[Option<UserId>]) {
884    let mut user_ids = collaborators
885        .values()
886        .map(|collaborator| collaborator.user_id)
887        .collect::<Vec<_>>();
888    user_ids.sort();
889    assert_eq!(
890        user_ids,
891        ids.into_iter().map(|id| id.unwrap()).collect::<Vec<_>>()
892    );
893}
894
895fn buffer_text(channel_buffer: &ModelHandle<language::Buffer>, cx: &mut TestAppContext) -> String {
896    channel_buffer.read_with(cx, |buffer, _| buffer.text())
897}