channel_buffer_tests.rs

  1use crate::{
  2    rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT},
  3    tests::TestServer,
  4};
  5use call::ActiveCall;
  6use channel::Channel;
  7use client::UserId;
  8use collab_ui::channel_view::ChannelView;
  9use collections::HashMap;
 10use futures::future;
 11use gpui::{executor::Deterministic, ModelHandle, TestAppContext};
 12use rpc::{proto, RECEIVE_TIMEOUT};
 13use serde_json::json;
 14use std::sync::Arc;
 15
 16#[gpui::test]
 17async fn test_core_channel_buffers(
 18    deterministic: Arc<Deterministic>,
 19    cx_a: &mut TestAppContext,
 20    cx_b: &mut TestAppContext,
 21) {
 22    deterministic.forbid_parking();
 23    let mut server = TestServer::start(&deterministic).await;
 24    let client_a = server.create_client(cx_a, "user_a").await;
 25    let client_b = server.create_client(cx_b, "user_b").await;
 26
 27    let channel_id = server
 28        .make_channel("zed", (&client_a, cx_a), &mut [(&client_b, cx_b)])
 29        .await;
 30
 31    // Client A joins the channel buffer
 32    let channel_buffer_a = client_a
 33        .channel_store()
 34        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
 35        .await
 36        .unwrap();
 37
 38    // Client A edits the buffer
 39    let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
 40    buffer_a.update(cx_a, |buffer, cx| {
 41        buffer.edit([(0..0, "hello world")], None, cx)
 42    });
 43    buffer_a.update(cx_a, |buffer, cx| {
 44        buffer.edit([(5..5, ", cruel")], None, cx)
 45    });
 46    buffer_a.update(cx_a, |buffer, cx| {
 47        buffer.edit([(0..5, "goodbye")], None, cx)
 48    });
 49    buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
 50    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
 51    deterministic.run_until_parked();
 52
 53    // Client B joins the channel buffer
 54    let channel_buffer_b = client_b
 55        .channel_store()
 56        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
 57        .await
 58        .unwrap();
 59    channel_buffer_b.read_with(cx_b, |buffer, _| {
 60        assert_collaborators(
 61            buffer.collaborators(),
 62            &[client_a.user_id(), client_b.user_id()],
 63        );
 64    });
 65
 66    // Client B sees the correct text, and then edits it
 67    let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
 68    assert_eq!(
 69        buffer_b.read_with(cx_b, |buffer, _| buffer.remote_id()),
 70        buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id())
 71    );
 72    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
 73    buffer_b.update(cx_b, |buffer, cx| {
 74        buffer.edit([(7..12, "beautiful")], None, cx)
 75    });
 76
 77    // Both A and B see the new edit
 78    deterministic.run_until_parked();
 79    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
 80    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
 81
 82    // Client A closes the channel buffer.
 83    cx_a.update(|_| drop(channel_buffer_a));
 84    deterministic.run_until_parked();
 85
 86    // Client B sees that client A is gone from the channel buffer.
 87    channel_buffer_b.read_with(cx_b, |buffer, _| {
 88        assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
 89    });
 90
 91    // Client A rejoins the channel buffer
 92    let _channel_buffer_a = client_a
 93        .channel_store()
 94        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
 95        .await
 96        .unwrap();
 97    deterministic.run_until_parked();
 98
 99    // Sanity test, make sure we saw A rejoining
100    channel_buffer_b.read_with(cx_b, |buffer, _| {
101        assert_collaborators(
102            &buffer.collaborators(),
103            &[client_b.user_id(), client_a.user_id()],
104        );
105    });
106
107    // Client A loses connection.
108    server.forbid_connections();
109    server.disconnect_client(client_a.peer_id().unwrap());
110    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
111
112    // Client B observes A disconnect
113    channel_buffer_b.read_with(cx_b, |buffer, _| {
114        assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
115    });
116
117    // TODO:
118    // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
119    // - Test interaction with channel deletion while buffer is open
120}
121
122#[gpui::test]
123async fn test_channel_buffer_replica_ids(
124    deterministic: Arc<Deterministic>,
125    cx_a: &mut TestAppContext,
126    cx_b: &mut TestAppContext,
127    cx_c: &mut TestAppContext,
128) {
129    deterministic.forbid_parking();
130    let mut server = TestServer::start(&deterministic).await;
131    let client_a = server.create_client(cx_a, "user_a").await;
132    let client_b = server.create_client(cx_b, "user_b").await;
133    let client_c = server.create_client(cx_c, "user_c").await;
134
135    let channel_id = server
136        .make_channel(
137            "the-channel",
138            (&client_a, cx_a),
139            &mut [(&client_b, cx_b), (&client_c, cx_c)],
140        )
141        .await;
142
143    let active_call_a = cx_a.read(ActiveCall::global);
144    let active_call_b = cx_b.read(ActiveCall::global);
145    let active_call_c = cx_c.read(ActiveCall::global);
146
147    // Clients A and B join a channel.
148    active_call_a
149        .update(cx_a, |call, cx| call.join_channel(channel_id, cx))
150        .await
151        .unwrap();
152    active_call_b
153        .update(cx_b, |call, cx| call.join_channel(channel_id, cx))
154        .await
155        .unwrap();
156
157    // Clients A, B, and C join a channel buffer
158    // C first so that the replica IDs in the project and the channel buffer are different
159    let channel_buffer_c = client_c
160        .channel_store()
161        .update(cx_c, |store, cx| store.open_channel_buffer(channel_id, cx))
162        .await
163        .unwrap();
164    let channel_buffer_b = client_b
165        .channel_store()
166        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
167        .await
168        .unwrap();
169    let channel_buffer_a = client_a
170        .channel_store()
171        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
172        .await
173        .unwrap();
174
175    // Client B shares a project
176    client_b
177        .fs()
178        .insert_tree("/dir", json!({ "file.txt": "contents" }))
179        .await;
180    let (project_b, _) = client_b.build_local_project("/dir", cx_b).await;
181    let shared_project_id = active_call_b
182        .update(cx_b, |call, cx| call.share_project(project_b.clone(), cx))
183        .await
184        .unwrap();
185
186    // Client A joins the project
187    let project_a = client_a.build_remote_project(shared_project_id, cx_a).await;
188    deterministic.run_until_parked();
189
190    // Client C is in a separate project.
191    client_c.fs().insert_tree("/dir", json!({})).await;
192    let (separate_project_c, _) = client_c.build_local_project("/dir", cx_c).await;
193
194    // Note that each user has a different replica id in the projects vs the
195    // channel buffer.
196    channel_buffer_a.read_with(cx_a, |channel_buffer, cx| {
197        assert_eq!(project_a.read(cx).replica_id(), 1);
198        assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 2);
199    });
200    channel_buffer_b.read_with(cx_b, |channel_buffer, cx| {
201        assert_eq!(project_b.read(cx).replica_id(), 0);
202        assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 1);
203    });
204    channel_buffer_c.read_with(cx_c, |channel_buffer, cx| {
205        // C is not in the project
206        assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 0);
207    });
208
209    let channel_window_a =
210        cx_a.add_window(|cx| ChannelView::new(project_a.clone(), channel_buffer_a.clone(), cx));
211    let channel_window_b =
212        cx_b.add_window(|cx| ChannelView::new(project_b.clone(), channel_buffer_b.clone(), cx));
213    let channel_window_c = cx_c.add_window(|cx| {
214        ChannelView::new(separate_project_c.clone(), channel_buffer_c.clone(), cx)
215    });
216
217    let channel_view_a = channel_window_a.root(cx_a);
218    let channel_view_b = channel_window_b.root(cx_b);
219    let channel_view_c = channel_window_c.root(cx_c);
220
221    // For clients A and B, the replica ids in the channel buffer are mapped
222    // so that they match the same users' replica ids in their shared project.
223    channel_view_a.read_with(cx_a, |view, cx| {
224        assert_eq!(
225            view.editor.read(cx).replica_id_map().unwrap(),
226            &[(1, 0), (2, 1)].into_iter().collect::<HashMap<_, _>>()
227        );
228    });
229    channel_view_b.read_with(cx_b, |view, cx| {
230        assert_eq!(
231            view.editor.read(cx).replica_id_map().unwrap(),
232            &[(1, 0), (2, 1)].into_iter().collect::<HashMap<u16, u16>>(),
233        )
234    });
235
236    // Client C only sees themself, as they're not part of any shared project
237    channel_view_c.read_with(cx_c, |view, cx| {
238        assert_eq!(
239            view.editor.read(cx).replica_id_map().unwrap(),
240            &[(0, 0)].into_iter().collect::<HashMap<u16, u16>>(),
241        );
242    });
243
244    // Client C joins the project that clients A and B are in.
245    active_call_c
246        .update(cx_c, |call, cx| call.join_channel(channel_id, cx))
247        .await
248        .unwrap();
249    let project_c = client_c.build_remote_project(shared_project_id, cx_c).await;
250    deterministic.run_until_parked();
251    project_c.read_with(cx_c, |project, _| {
252        assert_eq!(project.replica_id(), 2);
253    });
254
255    // For clients A and B, client C's replica id in the channel buffer is
256    // now mapped to their replica id in the shared project.
257    channel_view_a.read_with(cx_a, |view, cx| {
258        assert_eq!(
259            view.editor.read(cx).replica_id_map().unwrap(),
260            &[(1, 0), (2, 1), (0, 2)]
261                .into_iter()
262                .collect::<HashMap<_, _>>()
263        );
264    });
265    channel_view_b.read_with(cx_b, |view, cx| {
266        assert_eq!(
267            view.editor.read(cx).replica_id_map().unwrap(),
268            &[(1, 0), (2, 1), (0, 2)]
269                .into_iter()
270                .collect::<HashMap<_, _>>(),
271        )
272    });
273}
274
275#[gpui::test]
276async fn test_reopen_channel_buffer(deterministic: Arc<Deterministic>, cx_a: &mut TestAppContext) {
277    deterministic.forbid_parking();
278    let mut server = TestServer::start(&deterministic).await;
279    let client_a = server.create_client(cx_a, "user_a").await;
280
281    let channel_id = server
282        .make_channel("the-channel", (&client_a, cx_a), &mut [])
283        .await;
284
285    let channel_buffer_1 = client_a
286        .channel_store()
287        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
288    let channel_buffer_2 = client_a
289        .channel_store()
290        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
291    let channel_buffer_3 = client_a
292        .channel_store()
293        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
294
295    // All concurrent tasks for opening a channel buffer return the same model handle.
296    let (channel_buffer, channel_buffer_2, channel_buffer_3) =
297        future::try_join3(channel_buffer_1, channel_buffer_2, channel_buffer_3)
298            .await
299            .unwrap();
300    let channel_buffer_model_id = channel_buffer.id();
301    assert_eq!(channel_buffer, channel_buffer_2);
302    assert_eq!(channel_buffer, channel_buffer_3);
303
304    channel_buffer.update(cx_a, |buffer, cx| {
305        buffer.buffer().update(cx, |buffer, cx| {
306            buffer.edit([(0..0, "hello")], None, cx);
307        })
308    });
309    deterministic.run_until_parked();
310
311    cx_a.update(|_| {
312        drop(channel_buffer);
313        drop(channel_buffer_2);
314        drop(channel_buffer_3);
315    });
316    deterministic.run_until_parked();
317
318    // The channel buffer can be reopened after dropping it.
319    let channel_buffer = client_a
320        .channel_store()
321        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
322        .await
323        .unwrap();
324    assert_ne!(channel_buffer.id(), channel_buffer_model_id);
325    channel_buffer.update(cx_a, |buffer, cx| {
326        buffer.buffer().update(cx, |buffer, _| {
327            assert_eq!(buffer.text(), "hello");
328        })
329    });
330}
331
332#[gpui::test]
333async fn test_channel_buffer_disconnect(
334    deterministic: Arc<Deterministic>,
335    cx_a: &mut TestAppContext,
336    cx_b: &mut TestAppContext,
337) {
338    deterministic.forbid_parking();
339    let mut server = TestServer::start(&deterministic).await;
340    let client_a = server.create_client(cx_a, "user_a").await;
341    let client_b = server.create_client(cx_b, "user_b").await;
342
343    let channel_id = server
344        .make_channel("the-channel", (&client_a, cx_a), &mut [(&client_b, cx_b)])
345        .await;
346
347    let channel_buffer_a = client_a
348        .channel_store()
349        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
350        .await
351        .unwrap();
352    let channel_buffer_b = client_b
353        .channel_store()
354        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
355        .await
356        .unwrap();
357
358    server.forbid_connections();
359    server.disconnect_client(client_a.peer_id().unwrap());
360    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
361
362    channel_buffer_a.update(cx_a, |buffer, _| {
363        assert_eq!(
364            buffer.channel().as_ref(),
365            &Channel {
366                id: channel_id,
367                name: "the-channel".to_string()
368            }
369        );
370        assert!(!buffer.is_connected());
371    });
372
373    deterministic.run_until_parked();
374
375    server.allow_connections();
376    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
377
378    deterministic.run_until_parked();
379
380    client_a
381        .channel_store()
382        .update(cx_a, |channel_store, _| {
383            channel_store.remove_channel(channel_id)
384        })
385        .await
386        .unwrap();
387    deterministic.run_until_parked();
388
389    // Channel buffer observed the deletion
390    channel_buffer_b.update(cx_b, |buffer, _| {
391        assert_eq!(
392            buffer.channel().as_ref(),
393            &Channel {
394                id: channel_id,
395                name: "the-channel".to_string()
396            }
397        );
398        assert!(!buffer.is_connected());
399    });
400}
401
402#[gpui::test]
403async fn test_rejoin_channel_buffer(
404    deterministic: Arc<Deterministic>,
405    cx_a: &mut TestAppContext,
406    cx_b: &mut TestAppContext,
407) {
408    deterministic.forbid_parking();
409    let mut server = TestServer::start(&deterministic).await;
410    let client_a = server.create_client(cx_a, "user_a").await;
411    let client_b = server.create_client(cx_b, "user_b").await;
412
413    let channel_id = server
414        .make_channel("the-channel", (&client_a, cx_a), &mut [(&client_b, cx_b)])
415        .await;
416
417    let channel_buffer_a = client_a
418        .channel_store()
419        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
420        .await
421        .unwrap();
422    let channel_buffer_b = client_b
423        .channel_store()
424        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
425        .await
426        .unwrap();
427
428    channel_buffer_a.update(cx_a, |buffer, cx| {
429        buffer.buffer().update(cx, |buffer, cx| {
430            buffer.edit([(0..0, "1")], None, cx);
431        })
432    });
433    deterministic.run_until_parked();
434
435    // Client A disconnects.
436    server.forbid_connections();
437    server.disconnect_client(client_a.peer_id().unwrap());
438
439    // Both clients make an edit.
440    channel_buffer_a.update(cx_a, |buffer, cx| {
441        buffer.buffer().update(cx, |buffer, cx| {
442            buffer.edit([(1..1, "2")], None, cx);
443        })
444    });
445    channel_buffer_b.update(cx_b, |buffer, cx| {
446        buffer.buffer().update(cx, |buffer, cx| {
447            buffer.edit([(0..0, "0")], None, cx);
448        })
449    });
450
451    // Both clients see their own edit.
452    deterministic.run_until_parked();
453    channel_buffer_a.read_with(cx_a, |buffer, cx| {
454        assert_eq!(buffer.buffer().read(cx).text(), "12");
455    });
456    channel_buffer_b.read_with(cx_b, |buffer, cx| {
457        assert_eq!(buffer.buffer().read(cx).text(), "01");
458    });
459
460    // Client A reconnects. Both clients see each other's edits, and see
461    // the same collaborators.
462    server.allow_connections();
463    deterministic.advance_clock(RECEIVE_TIMEOUT);
464    channel_buffer_a.read_with(cx_a, |buffer, cx| {
465        assert_eq!(buffer.buffer().read(cx).text(), "012");
466    });
467    channel_buffer_b.read_with(cx_b, |buffer, cx| {
468        assert_eq!(buffer.buffer().read(cx).text(), "012");
469    });
470
471    channel_buffer_a.read_with(cx_a, |buffer_a, _| {
472        channel_buffer_b.read_with(cx_b, |buffer_b, _| {
473            assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
474        });
475    });
476}
477
478#[gpui::test]
479async fn test_channel_buffers_and_server_restarts(
480    deterministic: Arc<Deterministic>,
481    cx_a: &mut TestAppContext,
482    cx_b: &mut TestAppContext,
483    cx_c: &mut TestAppContext,
484) {
485    deterministic.forbid_parking();
486    let mut server = TestServer::start(&deterministic).await;
487    let client_a = server.create_client(cx_a, "user_a").await;
488    let client_b = server.create_client(cx_b, "user_b").await;
489    let client_c = server.create_client(cx_c, "user_c").await;
490
491    let channel_id = server
492        .make_channel(
493            "the-channel",
494            (&client_a, cx_a),
495            &mut [(&client_b, cx_b), (&client_c, cx_c)],
496        )
497        .await;
498
499    let channel_buffer_a = client_a
500        .channel_store()
501        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
502        .await
503        .unwrap();
504    let channel_buffer_b = client_b
505        .channel_store()
506        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
507        .await
508        .unwrap();
509    let _channel_buffer_c = client_c
510        .channel_store()
511        .update(cx_c, |store, cx| store.open_channel_buffer(channel_id, cx))
512        .await
513        .unwrap();
514
515    channel_buffer_a.update(cx_a, |buffer, cx| {
516        buffer.buffer().update(cx, |buffer, cx| {
517            buffer.edit([(0..0, "1")], None, cx);
518        })
519    });
520    deterministic.run_until_parked();
521
522    // Client C can't reconnect.
523    client_c.override_establish_connection(|_, cx| cx.spawn(|_| future::pending()));
524
525    // Server stops.
526    server.reset().await;
527    deterministic.advance_clock(RECEIVE_TIMEOUT);
528
529    // While the server is down, both clients make an edit.
530    channel_buffer_a.update(cx_a, |buffer, cx| {
531        buffer.buffer().update(cx, |buffer, cx| {
532            buffer.edit([(1..1, "2")], None, cx);
533        })
534    });
535    channel_buffer_b.update(cx_b, |buffer, cx| {
536        buffer.buffer().update(cx, |buffer, cx| {
537            buffer.edit([(0..0, "0")], None, cx);
538        })
539    });
540
541    // Server restarts.
542    server.start().await.unwrap();
543    deterministic.advance_clock(CLEANUP_TIMEOUT);
544
545    // Clients reconnects. Clients A and B see each other's edits, and see
546    // that client C has disconnected.
547    channel_buffer_a.read_with(cx_a, |buffer, cx| {
548        assert_eq!(buffer.buffer().read(cx).text(), "012");
549    });
550    channel_buffer_b.read_with(cx_b, |buffer, cx| {
551        assert_eq!(buffer.buffer().read(cx).text(), "012");
552    });
553
554    channel_buffer_a.read_with(cx_a, |buffer_a, _| {
555        channel_buffer_b.read_with(cx_b, |buffer_b, _| {
556            assert_eq!(
557                buffer_a
558                    .collaborators()
559                    .iter()
560                    .map(|c| c.user_id)
561                    .collect::<Vec<_>>(),
562                vec![client_a.user_id().unwrap(), client_b.user_id().unwrap()]
563            );
564            assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
565        });
566    });
567}
568
569#[track_caller]
570fn assert_collaborators(collaborators: &[proto::Collaborator], ids: &[Option<UserId>]) {
571    assert_eq!(
572        collaborators
573            .into_iter()
574            .map(|collaborator| collaborator.user_id)
575            .collect::<Vec<_>>(),
576        ids.into_iter().map(|id| id.unwrap()).collect::<Vec<_>>()
577    );
578}
579
580fn buffer_text(channel_buffer: &ModelHandle<language::Buffer>, cx: &mut TestAppContext) -> String {
581    channel_buffer.read_with(cx, |buffer, _| buffer.text())
582}