channel_buffer_tests.rs

  1use crate::{
  2    rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT},
  3    tests::TestServer,
  4};
  5use call::ActiveCall;
  6use channel::Channel;
  7use client::UserId;
  8use collab_ui::channel_view::ChannelView;
  9use collections::HashMap;
 10use futures::future;
 11use gpui::{executor::Deterministic, ModelHandle, TestAppContext};
 12use rpc::{proto, RECEIVE_TIMEOUT};
 13use serde_json::json;
 14use std::sync::Arc;
 15
 16#[gpui::test]
 17async fn test_core_channel_buffers(
 18    deterministic: Arc<Deterministic>,
 19    cx_a: &mut TestAppContext,
 20    cx_b: &mut TestAppContext,
 21) {
 22    deterministic.forbid_parking();
 23    let mut server = TestServer::start(&deterministic).await;
 24    let client_a = server.create_client(cx_a, "user_a").await;
 25    let client_b = server.create_client(cx_b, "user_b").await;
 26
 27    let channel_id = server
 28        .make_channel("zed", None, (&client_a, cx_a), &mut [(&client_b, cx_b)])
 29        .await;
 30
 31    // Client A joins the channel buffer
 32    let channel_buffer_a = client_a
 33        .channel_store()
 34        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
 35        .await
 36        .unwrap();
 37
 38    // Client A edits the buffer
 39    let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
 40    buffer_a.update(cx_a, |buffer, cx| {
 41        buffer.edit([(0..0, "hello world")], None, cx)
 42    });
 43    buffer_a.update(cx_a, |buffer, cx| {
 44        buffer.edit([(5..5, ", cruel")], None, cx)
 45    });
 46    buffer_a.update(cx_a, |buffer, cx| {
 47        buffer.edit([(0..5, "goodbye")], None, cx)
 48    });
 49    buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
 50    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
 51    deterministic.run_until_parked();
 52
 53    // Client B joins the channel buffer
 54    let channel_buffer_b = client_b
 55        .channel_store()
 56        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
 57        .await
 58        .unwrap();
 59    channel_buffer_b.read_with(cx_b, |buffer, _| {
 60        assert_collaborators(
 61            buffer.collaborators(),
 62            &[client_a.user_id(), client_b.user_id()],
 63        );
 64    });
 65
 66    // Client B sees the correct text, and then edits it
 67    let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
 68    assert_eq!(
 69        buffer_b.read_with(cx_b, |buffer, _| buffer.remote_id()),
 70        buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id())
 71    );
 72    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
 73    buffer_b.update(cx_b, |buffer, cx| {
 74        buffer.edit([(7..12, "beautiful")], None, cx)
 75    });
 76
 77    // Both A and B see the new edit
 78    deterministic.run_until_parked();
 79    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
 80    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
 81
 82    // Client A closes the channel buffer.
 83    cx_a.update(|_| drop(channel_buffer_a));
 84    deterministic.run_until_parked();
 85
 86    // Client B sees that client A is gone from the channel buffer.
 87    channel_buffer_b.read_with(cx_b, |buffer, _| {
 88        assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
 89    });
 90
 91    // Client A rejoins the channel buffer
 92    let _channel_buffer_a = client_a
 93        .channel_store()
 94        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
 95        .await
 96        .unwrap();
 97    deterministic.run_until_parked();
 98
 99    // Sanity test, make sure we saw A rejoining
100    channel_buffer_b.read_with(cx_b, |buffer, _| {
101        assert_collaborators(
102            &buffer.collaborators(),
103            &[client_b.user_id(), client_a.user_id()],
104        );
105    });
106
107    // Client A loses connection.
108    server.forbid_connections();
109    server.disconnect_client(client_a.peer_id().unwrap());
110    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
111
112    // Client B observes A disconnect
113    channel_buffer_b.read_with(cx_b, |buffer, _| {
114        assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
115    });
116
117    // TODO:
118    // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
119    // - Test interaction with channel deletion while buffer is open
120}
121
122#[gpui::test]
123async fn test_channel_buffer_replica_ids(
124    deterministic: Arc<Deterministic>,
125    cx_a: &mut TestAppContext,
126    cx_b: &mut TestAppContext,
127    cx_c: &mut TestAppContext,
128) {
129    deterministic.forbid_parking();
130    let mut server = TestServer::start(&deterministic).await;
131    let client_a = server.create_client(cx_a, "user_a").await;
132    let client_b = server.create_client(cx_b, "user_b").await;
133    let client_c = server.create_client(cx_c, "user_c").await;
134
135    let channel_id = server
136        .make_channel(
137            "the-channel",
138            None,
139            (&client_a, cx_a),
140            &mut [(&client_b, cx_b), (&client_c, cx_c)],
141        )
142        .await;
143
144    let active_call_a = cx_a.read(ActiveCall::global);
145    let active_call_b = cx_b.read(ActiveCall::global);
146    let active_call_c = cx_c.read(ActiveCall::global);
147
148    // Clients A and B join a channel.
149    active_call_a
150        .update(cx_a, |call, cx| call.join_channel(channel_id, cx))
151        .await
152        .unwrap();
153    active_call_b
154        .update(cx_b, |call, cx| call.join_channel(channel_id, cx))
155        .await
156        .unwrap();
157
158    // Clients A, B, and C join a channel buffer
159    // C first so that the replica IDs in the project and the channel buffer are different
160    let channel_buffer_c = client_c
161        .channel_store()
162        .update(cx_c, |store, cx| store.open_channel_buffer(channel_id, cx))
163        .await
164        .unwrap();
165    let channel_buffer_b = client_b
166        .channel_store()
167        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
168        .await
169        .unwrap();
170    let channel_buffer_a = client_a
171        .channel_store()
172        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
173        .await
174        .unwrap();
175
176    // Client B shares a project
177    client_b
178        .fs()
179        .insert_tree("/dir", json!({ "file.txt": "contents" }))
180        .await;
181    let (project_b, _) = client_b.build_local_project("/dir", cx_b).await;
182    let shared_project_id = active_call_b
183        .update(cx_b, |call, cx| call.share_project(project_b.clone(), cx))
184        .await
185        .unwrap();
186
187    // Client A joins the project
188    let project_a = client_a.build_remote_project(shared_project_id, cx_a).await;
189    deterministic.run_until_parked();
190
191    // Client C is in a separate project.
192    client_c.fs().insert_tree("/dir", json!({})).await;
193    let (separate_project_c, _) = client_c.build_local_project("/dir", cx_c).await;
194
195    // Note that each user has a different replica id in the projects vs the
196    // channel buffer.
197    channel_buffer_a.read_with(cx_a, |channel_buffer, cx| {
198        assert_eq!(project_a.read(cx).replica_id(), 1);
199        assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 2);
200    });
201    channel_buffer_b.read_with(cx_b, |channel_buffer, cx| {
202        assert_eq!(project_b.read(cx).replica_id(), 0);
203        assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 1);
204    });
205    channel_buffer_c.read_with(cx_c, |channel_buffer, cx| {
206        // C is not in the project
207        assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 0);
208    });
209
210    let channel_window_a =
211        cx_a.add_window(|cx| ChannelView::new(project_a.clone(), channel_buffer_a.clone(), cx));
212    let channel_window_b =
213        cx_b.add_window(|cx| ChannelView::new(project_b.clone(), channel_buffer_b.clone(), cx));
214    let channel_window_c = cx_c.add_window(|cx| {
215        ChannelView::new(separate_project_c.clone(), channel_buffer_c.clone(), cx)
216    });
217
218    let channel_view_a = channel_window_a.root(cx_a);
219    let channel_view_b = channel_window_b.root(cx_b);
220    let channel_view_c = channel_window_c.root(cx_c);
221
222    // For clients A and B, the replica ids in the channel buffer are mapped
223    // so that they match the same users' replica ids in their shared project.
224    channel_view_a.read_with(cx_a, |view, cx| {
225        assert_eq!(
226            view.editor.read(cx).replica_id_map().unwrap(),
227            &[(1, 0), (2, 1)].into_iter().collect::<HashMap<_, _>>()
228        );
229    });
230    channel_view_b.read_with(cx_b, |view, cx| {
231        assert_eq!(
232            view.editor.read(cx).replica_id_map().unwrap(),
233            &[(1, 0), (2, 1)].into_iter().collect::<HashMap<u16, u16>>(),
234        )
235    });
236
237    // Client C only sees themself, as they're not part of any shared project
238    channel_view_c.read_with(cx_c, |view, cx| {
239        assert_eq!(
240            view.editor.read(cx).replica_id_map().unwrap(),
241            &[(0, 0)].into_iter().collect::<HashMap<u16, u16>>(),
242        );
243    });
244
245    // Client C joins the project that clients A and B are in.
246    active_call_c
247        .update(cx_c, |call, cx| call.join_channel(channel_id, cx))
248        .await
249        .unwrap();
250    let project_c = client_c.build_remote_project(shared_project_id, cx_c).await;
251    deterministic.run_until_parked();
252    project_c.read_with(cx_c, |project, _| {
253        assert_eq!(project.replica_id(), 2);
254    });
255
256    // For clients A and B, client C's replica id in the channel buffer is
257    // now mapped to their replica id in the shared project.
258    channel_view_a.read_with(cx_a, |view, cx| {
259        assert_eq!(
260            view.editor.read(cx).replica_id_map().unwrap(),
261            &[(1, 0), (2, 1), (0, 2)]
262                .into_iter()
263                .collect::<HashMap<_, _>>()
264        );
265    });
266    channel_view_b.read_with(cx_b, |view, cx| {
267        assert_eq!(
268            view.editor.read(cx).replica_id_map().unwrap(),
269            &[(1, 0), (2, 1), (0, 2)]
270                .into_iter()
271                .collect::<HashMap<_, _>>(),
272        )
273    });
274}
275
276#[gpui::test]
277async fn test_multiple_handles_to_channel_buffer(
278    deterministic: Arc<Deterministic>,
279    cx_a: &mut TestAppContext,
280) {
281    deterministic.forbid_parking();
282    let mut server = TestServer::start(&deterministic).await;
283    let client_a = server.create_client(cx_a, "user_a").await;
284
285    let channel_id = server
286        .make_channel("the-channel", None, (&client_a, cx_a), &mut [])
287        .await;
288
289    let channel_buffer_1 = client_a
290        .channel_store()
291        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
292    let channel_buffer_2 = client_a
293        .channel_store()
294        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
295    let channel_buffer_3 = client_a
296        .channel_store()
297        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
298
299    // All concurrent tasks for opening a channel buffer return the same model handle.
300    let (channel_buffer, channel_buffer_2, channel_buffer_3) =
301        future::try_join3(channel_buffer_1, channel_buffer_2, channel_buffer_3)
302            .await
303            .unwrap();
304    let channel_buffer_model_id = channel_buffer.id();
305    assert_eq!(channel_buffer, channel_buffer_2);
306    assert_eq!(channel_buffer, channel_buffer_3);
307
308    channel_buffer.update(cx_a, |buffer, cx| {
309        buffer.buffer().update(cx, |buffer, cx| {
310            buffer.edit([(0..0, "hello")], None, cx);
311        })
312    });
313    deterministic.run_until_parked();
314
315    cx_a.update(|_| {
316        drop(channel_buffer);
317        drop(channel_buffer_2);
318        drop(channel_buffer_3);
319    });
320    deterministic.run_until_parked();
321
322    // The channel buffer can be reopened after dropping it.
323    let channel_buffer = client_a
324        .channel_store()
325        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
326        .await
327        .unwrap();
328    assert_ne!(channel_buffer.id(), channel_buffer_model_id);
329    channel_buffer.update(cx_a, |buffer, cx| {
330        buffer.buffer().update(cx, |buffer, _| {
331            assert_eq!(buffer.text(), "hello");
332        })
333    });
334}
335
336#[gpui::test]
337async fn test_channel_buffer_disconnect(
338    deterministic: Arc<Deterministic>,
339    cx_a: &mut TestAppContext,
340    cx_b: &mut TestAppContext,
341) {
342    deterministic.forbid_parking();
343    let mut server = TestServer::start(&deterministic).await;
344    let client_a = server.create_client(cx_a, "user_a").await;
345    let client_b = server.create_client(cx_b, "user_b").await;
346
347    let channel_id = server
348        .make_channel(
349            "the-channel",
350            None,
351            (&client_a, cx_a),
352            &mut [(&client_b, cx_b)],
353        )
354        .await;
355
356    let channel_buffer_a = client_a
357        .channel_store()
358        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
359        .await
360        .unwrap();
361    let channel_buffer_b = client_b
362        .channel_store()
363        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
364        .await
365        .unwrap();
366
367    server.forbid_connections();
368    server.disconnect_client(client_a.peer_id().unwrap());
369    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
370
371    channel_buffer_a.update(cx_a, |buffer, _| {
372        assert_eq!(
373            buffer.channel().as_ref(),
374            &Channel {
375                id: channel_id,
376                name: "the-channel".to_string()
377            }
378        );
379        assert!(!buffer.is_connected());
380    });
381
382    deterministic.run_until_parked();
383
384    server.allow_connections();
385    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
386
387    deterministic.run_until_parked();
388
389    client_a
390        .channel_store()
391        .update(cx_a, |channel_store, _| {
392            channel_store.remove_channel(channel_id)
393        })
394        .await
395        .unwrap();
396    deterministic.run_until_parked();
397
398    // Channel buffer observed the deletion
399    channel_buffer_b.update(cx_b, |buffer, _| {
400        assert_eq!(
401            buffer.channel().as_ref(),
402            &Channel {
403                id: channel_id,
404                name: "the-channel".to_string()
405            }
406        );
407        assert!(!buffer.is_connected());
408    });
409}
410
411#[gpui::test]
412async fn test_rejoin_channel_buffer(
413    deterministic: Arc<Deterministic>,
414    cx_a: &mut TestAppContext,
415    cx_b: &mut TestAppContext,
416) {
417    deterministic.forbid_parking();
418    let mut server = TestServer::start(&deterministic).await;
419    let client_a = server.create_client(cx_a, "user_a").await;
420    let client_b = server.create_client(cx_b, "user_b").await;
421
422    let channel_id = server
423        .make_channel(
424            "the-channel",
425            None,
426            (&client_a, cx_a),
427            &mut [(&client_b, cx_b)],
428        )
429        .await;
430
431    let channel_buffer_a = client_a
432        .channel_store()
433        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
434        .await
435        .unwrap();
436    let channel_buffer_b = client_b
437        .channel_store()
438        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
439        .await
440        .unwrap();
441
442    channel_buffer_a.update(cx_a, |buffer, cx| {
443        buffer.buffer().update(cx, |buffer, cx| {
444            buffer.edit([(0..0, "1")], None, cx);
445        })
446    });
447    deterministic.run_until_parked();
448
449    // Client A disconnects.
450    server.forbid_connections();
451    server.disconnect_client(client_a.peer_id().unwrap());
452
453    // Both clients make an edit.
454    channel_buffer_a.update(cx_a, |buffer, cx| {
455        buffer.buffer().update(cx, |buffer, cx| {
456            buffer.edit([(1..1, "2")], None, cx);
457        })
458    });
459    channel_buffer_b.update(cx_b, |buffer, cx| {
460        buffer.buffer().update(cx, |buffer, cx| {
461            buffer.edit([(0..0, "0")], None, cx);
462        })
463    });
464
465    // Both clients see their own edit.
466    deterministic.run_until_parked();
467    channel_buffer_a.read_with(cx_a, |buffer, cx| {
468        assert_eq!(buffer.buffer().read(cx).text(), "12");
469    });
470    channel_buffer_b.read_with(cx_b, |buffer, cx| {
471        assert_eq!(buffer.buffer().read(cx).text(), "01");
472    });
473
474    // Client A reconnects. Both clients see each other's edits, and see
475    // the same collaborators.
476    server.allow_connections();
477    deterministic.advance_clock(RECEIVE_TIMEOUT);
478    channel_buffer_a.read_with(cx_a, |buffer, cx| {
479        assert_eq!(buffer.buffer().read(cx).text(), "012");
480    });
481    channel_buffer_b.read_with(cx_b, |buffer, cx| {
482        assert_eq!(buffer.buffer().read(cx).text(), "012");
483    });
484
485    channel_buffer_a.read_with(cx_a, |buffer_a, _| {
486        channel_buffer_b.read_with(cx_b, |buffer_b, _| {
487            assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
488        });
489    });
490}
491
492#[gpui::test]
493async fn test_channel_buffers_and_server_restarts(
494    deterministic: Arc<Deterministic>,
495    cx_a: &mut TestAppContext,
496    cx_b: &mut TestAppContext,
497    cx_c: &mut TestAppContext,
498) {
499    deterministic.forbid_parking();
500    let mut server = TestServer::start(&deterministic).await;
501    let client_a = server.create_client(cx_a, "user_a").await;
502    let client_b = server.create_client(cx_b, "user_b").await;
503    let client_c = server.create_client(cx_c, "user_c").await;
504
505    let channel_id = server
506        .make_channel(
507            "the-channel",
508            None,
509            (&client_a, cx_a),
510            &mut [(&client_b, cx_b), (&client_c, cx_c)],
511        )
512        .await;
513
514    let channel_buffer_a = client_a
515        .channel_store()
516        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
517        .await
518        .unwrap();
519    let channel_buffer_b = client_b
520        .channel_store()
521        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
522        .await
523        .unwrap();
524    let _channel_buffer_c = client_c
525        .channel_store()
526        .update(cx_c, |store, cx| store.open_channel_buffer(channel_id, cx))
527        .await
528        .unwrap();
529
530    channel_buffer_a.update(cx_a, |buffer, cx| {
531        buffer.buffer().update(cx, |buffer, cx| {
532            buffer.edit([(0..0, "1")], None, cx);
533        })
534    });
535    deterministic.run_until_parked();
536
537    // Client C can't reconnect.
538    client_c.override_establish_connection(|_, cx| cx.spawn(|_| future::pending()));
539
540    // Server stops.
541    server.reset().await;
542    deterministic.advance_clock(RECEIVE_TIMEOUT);
543
544    // While the server is down, both clients make an edit.
545    channel_buffer_a.update(cx_a, |buffer, cx| {
546        buffer.buffer().update(cx, |buffer, cx| {
547            buffer.edit([(1..1, "2")], None, cx);
548        })
549    });
550    channel_buffer_b.update(cx_b, |buffer, cx| {
551        buffer.buffer().update(cx, |buffer, cx| {
552            buffer.edit([(0..0, "0")], None, cx);
553        })
554    });
555
556    // Server restarts.
557    server.start().await.unwrap();
558    deterministic.advance_clock(CLEANUP_TIMEOUT);
559
560    // Clients reconnects. Clients A and B see each other's edits, and see
561    // that client C has disconnected.
562    channel_buffer_a.read_with(cx_a, |buffer, cx| {
563        assert_eq!(buffer.buffer().read(cx).text(), "012");
564    });
565    channel_buffer_b.read_with(cx_b, |buffer, cx| {
566        assert_eq!(buffer.buffer().read(cx).text(), "012");
567    });
568
569    channel_buffer_a.read_with(cx_a, |buffer_a, _| {
570        channel_buffer_b.read_with(cx_b, |buffer_b, _| {
571            assert_eq!(
572                buffer_a
573                    .collaborators()
574                    .iter()
575                    .map(|c| c.user_id)
576                    .collect::<Vec<_>>(),
577                vec![client_a.user_id().unwrap(), client_b.user_id().unwrap()]
578            );
579            assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
580        });
581    });
582}
583
584#[gpui::test(iterations = 10)]
585async fn test_following_to_channel_notes_without_a_shared_project(
586    deterministic: Arc<Deterministic>,
587    mut cx_a: &mut TestAppContext,
588    mut cx_b: &mut TestAppContext,
589    mut cx_c: &mut TestAppContext,
590) {
591    deterministic.forbid_parking();
592    let mut server = TestServer::start(&deterministic).await;
593    let client_a = server.create_client(cx_a, "user_a").await;
594    let client_b = server.create_client(cx_b, "user_b").await;
595    let client_c = server.create_client(cx_c, "user_c").await;
596
597    cx_a.update(editor::init);
598    cx_b.update(editor::init);
599    cx_c.update(editor::init);
600    cx_a.update(collab_ui::channel_view::init);
601    cx_b.update(collab_ui::channel_view::init);
602    cx_c.update(collab_ui::channel_view::init);
603
604    let channel_1_id = server
605        .make_channel(
606            "channel-1",
607            None,
608            (&client_a, cx_a),
609            &mut [(&client_b, cx_b), (&client_c, cx_c)],
610        )
611        .await;
612    let channel_2_id = server
613        .make_channel(
614            "channel-2",
615            None,
616            (&client_a, cx_a),
617            &mut [(&client_b, cx_b), (&client_c, cx_c)],
618        )
619        .await;
620
621    // Clients A, B, and C join a channel.
622    let active_call_a = cx_a.read(ActiveCall::global);
623    let active_call_b = cx_b.read(ActiveCall::global);
624    let active_call_c = cx_c.read(ActiveCall::global);
625    for (call, cx) in [
626        (&active_call_a, &mut cx_a),
627        (&active_call_b, &mut cx_b),
628        (&active_call_c, &mut cx_c),
629    ] {
630        call.update(*cx, |call, cx| call.join_channel(channel_1_id, cx))
631            .await
632            .unwrap();
633    }
634    deterministic.run_until_parked();
635
636    // Clients A, B, and C all open their own unshared projects.
637    client_a.fs().insert_tree("/a", json!({})).await;
638    client_b.fs().insert_tree("/b", json!({})).await;
639    client_c.fs().insert_tree("/c", json!({})).await;
640    let (project_a, _) = client_a.build_local_project("/a", cx_a).await;
641    let (project_b, _) = client_b.build_local_project("/b", cx_b).await;
642    let (project_c, _) = client_b.build_local_project("/c", cx_c).await;
643    let workspace_a = client_a.build_workspace(&project_a, cx_a).root(cx_a);
644    let workspace_b = client_b.build_workspace(&project_b, cx_b).root(cx_b);
645    let _workspace_c = client_c.build_workspace(&project_c, cx_c).root(cx_c);
646
647    active_call_a
648        .update(cx_a, |call, cx| call.set_location(Some(&project_a), cx))
649        .await
650        .unwrap();
651
652    // Client A opens the notes for channel 1.
653    let channel_view_1_a = cx_a
654        .update(|cx| ChannelView::open(channel_1_id, workspace_a.clone(), cx))
655        .await
656        .unwrap();
657    channel_view_1_a.update(cx_a, |notes, cx| {
658        assert_eq!(notes.channel(cx).name, "channel-1");
659        notes.editor.update(cx, |editor, cx| {
660            editor.insert("Hello from A.", cx);
661            editor.change_selections(None, cx, |selections| {
662                selections.select_ranges(vec![3..4]);
663            });
664        });
665    });
666
667    // Client B follows client A.
668    workspace_b
669        .update(cx_b, |workspace, cx| {
670            workspace
671                .toggle_follow(client_a.peer_id().unwrap(), cx)
672                .unwrap()
673        })
674        .await
675        .unwrap();
676
677    // Client B is taken to the notes for channel 1, with the same
678    // text selected as client A.
679    deterministic.run_until_parked();
680    let channel_view_1_b = workspace_b.read_with(cx_b, |workspace, cx| {
681        assert_eq!(
682            workspace.leader_for_pane(workspace.active_pane()),
683            Some(client_a.peer_id().unwrap())
684        );
685        workspace
686            .active_item(cx)
687            .expect("no active item")
688            .downcast::<ChannelView>()
689            .expect("active item is not a channel view")
690    });
691    channel_view_1_b.read_with(cx_b, |notes, cx| {
692        assert_eq!(notes.channel(cx).name, "channel-1");
693        let editor = notes.editor.read(cx);
694        assert_eq!(editor.text(cx), "Hello from A.");
695        assert_eq!(editor.selections.ranges::<usize>(cx), &[3..4]);
696    });
697
698    // Client A opens the notes for channel 2.
699    let channel_view_2_a = cx_a
700        .update(|cx| ChannelView::open(channel_2_id, workspace_a.clone(), cx))
701        .await
702        .unwrap();
703    channel_view_2_a.read_with(cx_a, |notes, cx| {
704        assert_eq!(notes.channel(cx).name, "channel-2");
705    });
706
707    // Client B is taken to the notes for channel 2.
708    deterministic.run_until_parked();
709    let channel_view_2_b = workspace_b.read_with(cx_b, |workspace, cx| {
710        assert_eq!(
711            workspace.leader_for_pane(workspace.active_pane()),
712            Some(client_a.peer_id().unwrap())
713        );
714        workspace
715            .active_item(cx)
716            .expect("no active item")
717            .downcast::<ChannelView>()
718            .expect("active item is not a channel view")
719    });
720    channel_view_2_b.read_with(cx_b, |notes, cx| {
721        assert_eq!(notes.channel(cx).name, "channel-2");
722    });
723}
724
725#[track_caller]
726fn assert_collaborators(collaborators: &[proto::Collaborator], ids: &[Option<UserId>]) {
727    assert_eq!(
728        collaborators
729            .into_iter()
730            .map(|collaborator| collaborator.user_id)
731            .collect::<Vec<_>>(),
732        ids.into_iter().map(|id| id.unwrap()).collect::<Vec<_>>()
733    );
734}
735
736fn buffer_text(channel_buffer: &ModelHandle<language::Buffer>, cx: &mut TestAppContext) -> String {
737    channel_buffer.read_with(cx, |buffer, _| buffer.text())
738}