channel_buffer_tests.rs

  1use crate::{rpc::RECONNECT_TIMEOUT, tests::TestServer};
  2use call::ActiveCall;
  3use channel::Channel;
  4use client::UserId;
  5use collab_ui::channel_view::ChannelView;
  6use collections::HashMap;
  7use futures::future;
  8use gpui::{executor::Deterministic, ModelHandle, TestAppContext};
  9use rpc::{proto, RECEIVE_TIMEOUT};
 10use serde_json::json;
 11use std::sync::Arc;
 12
 13#[gpui::test]
 14async fn test_core_channel_buffers(
 15    deterministic: Arc<Deterministic>,
 16    cx_a: &mut TestAppContext,
 17    cx_b: &mut TestAppContext,
 18) {
 19    deterministic.forbid_parking();
 20    let mut server = TestServer::start(&deterministic).await;
 21    let client_a = server.create_client(cx_a, "user_a").await;
 22    let client_b = server.create_client(cx_b, "user_b").await;
 23
 24    let channel_id = server
 25        .make_channel("zed", (&client_a, cx_a), &mut [(&client_b, cx_b)])
 26        .await;
 27
 28    // Client A joins the channel buffer
 29    let channel_buffer_a = client_a
 30        .channel_store()
 31        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
 32        .await
 33        .unwrap();
 34
 35    // Client A edits the buffer
 36    let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
 37    buffer_a.update(cx_a, |buffer, cx| {
 38        buffer.edit([(0..0, "hello world")], None, cx)
 39    });
 40    buffer_a.update(cx_a, |buffer, cx| {
 41        buffer.edit([(5..5, ", cruel")], None, cx)
 42    });
 43    buffer_a.update(cx_a, |buffer, cx| {
 44        buffer.edit([(0..5, "goodbye")], None, cx)
 45    });
 46    buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
 47    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
 48    deterministic.run_until_parked();
 49
 50    // Client B joins the channel buffer
 51    let channel_buffer_b = client_b
 52        .channel_store()
 53        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
 54        .await
 55        .unwrap();
 56    channel_buffer_b.read_with(cx_b, |buffer, _| {
 57        assert_collaborators(
 58            buffer.collaborators(),
 59            &[client_a.user_id(), client_b.user_id()],
 60        );
 61    });
 62
 63    // Client B sees the correct text, and then edits it
 64    let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
 65    assert_eq!(
 66        buffer_b.read_with(cx_b, |buffer, _| buffer.remote_id()),
 67        buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id())
 68    );
 69    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
 70    buffer_b.update(cx_b, |buffer, cx| {
 71        buffer.edit([(7..12, "beautiful")], None, cx)
 72    });
 73
 74    // Both A and B see the new edit
 75    deterministic.run_until_parked();
 76    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
 77    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
 78
 79    // Client A closes the channel buffer.
 80    cx_a.update(|_| drop(channel_buffer_a));
 81    deterministic.run_until_parked();
 82
 83    // Client B sees that client A is gone from the channel buffer.
 84    channel_buffer_b.read_with(cx_b, |buffer, _| {
 85        assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
 86    });
 87
 88    // Client A rejoins the channel buffer
 89    let _channel_buffer_a = client_a
 90        .channel_store()
 91        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
 92        .await
 93        .unwrap();
 94    deterministic.run_until_parked();
 95
 96    // Sanity test, make sure we saw A rejoining
 97    channel_buffer_b.read_with(cx_b, |buffer, _| {
 98        assert_collaborators(
 99            &buffer.collaborators(),
100            &[client_b.user_id(), client_a.user_id()],
101        );
102    });
103
104    // Client A loses connection.
105    server.forbid_connections();
106    server.disconnect_client(client_a.peer_id().unwrap());
107    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
108
109    // Client B observes A disconnect
110    channel_buffer_b.read_with(cx_b, |buffer, _| {
111        assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
112    });
113
114    // TODO:
115    // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
116    // - Test interaction with channel deletion while buffer is open
117}
118
119#[gpui::test]
120async fn test_channel_buffer_replica_ids(
121    deterministic: Arc<Deterministic>,
122    cx_a: &mut TestAppContext,
123    cx_b: &mut TestAppContext,
124    cx_c: &mut TestAppContext,
125) {
126    deterministic.forbid_parking();
127    let mut server = TestServer::start(&deterministic).await;
128    let client_a = server.create_client(cx_a, "user_a").await;
129    let client_b = server.create_client(cx_b, "user_b").await;
130    let client_c = server.create_client(cx_c, "user_c").await;
131
132    let channel_id = server
133        .make_channel(
134            "the-channel",
135            (&client_a, cx_a),
136            &mut [(&client_b, cx_b), (&client_c, cx_c)],
137        )
138        .await;
139
140    let active_call_a = cx_a.read(ActiveCall::global);
141    let active_call_b = cx_b.read(ActiveCall::global);
142    let active_call_c = cx_c.read(ActiveCall::global);
143
144    // Clients A and B join a channel.
145    active_call_a
146        .update(cx_a, |call, cx| call.join_channel(channel_id, cx))
147        .await
148        .unwrap();
149    active_call_b
150        .update(cx_b, |call, cx| call.join_channel(channel_id, cx))
151        .await
152        .unwrap();
153
154    // Clients A, B, and C join a channel buffer
155    // C first so that the replica IDs in the project and the channel buffer are different
156    let channel_buffer_c = client_c
157        .channel_store()
158        .update(cx_c, |store, cx| store.open_channel_buffer(channel_id, cx))
159        .await
160        .unwrap();
161    let channel_buffer_b = client_b
162        .channel_store()
163        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
164        .await
165        .unwrap();
166    let channel_buffer_a = client_a
167        .channel_store()
168        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
169        .await
170        .unwrap();
171
172    // Client B shares a project
173    client_b
174        .fs()
175        .insert_tree("/dir", json!({ "file.txt": "contents" }))
176        .await;
177    let (project_b, _) = client_b.build_local_project("/dir", cx_b).await;
178    let shared_project_id = active_call_b
179        .update(cx_b, |call, cx| call.share_project(project_b.clone(), cx))
180        .await
181        .unwrap();
182
183    // Client A joins the project
184    let project_a = client_a.build_remote_project(shared_project_id, cx_a).await;
185    deterministic.run_until_parked();
186
187    // Client C is in a separate project.
188    client_c.fs().insert_tree("/dir", json!({})).await;
189    let (separate_project_c, _) = client_c.build_local_project("/dir", cx_c).await;
190
191    // Note that each user has a different replica id in the projects vs the
192    // channel buffer.
193    channel_buffer_a.read_with(cx_a, |channel_buffer, cx| {
194        assert_eq!(project_a.read(cx).replica_id(), 1);
195        assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 2);
196    });
197    channel_buffer_b.read_with(cx_b, |channel_buffer, cx| {
198        assert_eq!(project_b.read(cx).replica_id(), 0);
199        assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 1);
200    });
201    channel_buffer_c.read_with(cx_c, |channel_buffer, cx| {
202        // C is not in the project
203        assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 0);
204    });
205
206    let channel_window_a =
207        cx_a.add_window(|cx| ChannelView::new(project_a.clone(), channel_buffer_a.clone(), cx));
208    let channel_window_b =
209        cx_b.add_window(|cx| ChannelView::new(project_b.clone(), channel_buffer_b.clone(), cx));
210    let channel_window_c = cx_c.add_window(|cx| {
211        ChannelView::new(separate_project_c.clone(), channel_buffer_c.clone(), cx)
212    });
213
214    let channel_view_a = channel_window_a.root(cx_a);
215    let channel_view_b = channel_window_b.root(cx_b);
216    let channel_view_c = channel_window_c.root(cx_c);
217
218    // For clients A and B, the replica ids in the channel buffer are mapped
219    // so that they match the same users' replica ids in their shared project.
220    channel_view_a.read_with(cx_a, |view, cx| {
221        assert_eq!(
222            view.editor.read(cx).replica_id_map().unwrap(),
223            &[(1, 0), (2, 1)].into_iter().collect::<HashMap<_, _>>()
224        );
225    });
226    channel_view_b.read_with(cx_b, |view, cx| {
227        assert_eq!(
228            view.editor.read(cx).replica_id_map().unwrap(),
229            &[(1, 0), (2, 1)].into_iter().collect::<HashMap<u16, u16>>(),
230        )
231    });
232
233    // Client C only sees themself, as they're not part of any shared project
234    channel_view_c.read_with(cx_c, |view, cx| {
235        assert_eq!(
236            view.editor.read(cx).replica_id_map().unwrap(),
237            &[(0, 0)].into_iter().collect::<HashMap<u16, u16>>(),
238        );
239    });
240
241    // Client C joins the project that clients A and B are in.
242    active_call_c
243        .update(cx_c, |call, cx| call.join_channel(channel_id, cx))
244        .await
245        .unwrap();
246    let project_c = client_c.build_remote_project(shared_project_id, cx_c).await;
247    deterministic.run_until_parked();
248    project_c.read_with(cx_c, |project, _| {
249        assert_eq!(project.replica_id(), 2);
250    });
251
252    // For clients A and B, client C's replica id in the channel buffer is
253    // now mapped to their replica id in the shared project.
254    channel_view_a.read_with(cx_a, |view, cx| {
255        assert_eq!(
256            view.editor.read(cx).replica_id_map().unwrap(),
257            &[(1, 0), (2, 1), (0, 2)]
258                .into_iter()
259                .collect::<HashMap<_, _>>()
260        );
261    });
262    channel_view_b.read_with(cx_b, |view, cx| {
263        assert_eq!(
264            view.editor.read(cx).replica_id_map().unwrap(),
265            &[(1, 0), (2, 1), (0, 2)]
266                .into_iter()
267                .collect::<HashMap<_, _>>(),
268        )
269    });
270}
271
272#[gpui::test]
273async fn test_reopen_channel_buffer(deterministic: Arc<Deterministic>, cx_a: &mut TestAppContext) {
274    deterministic.forbid_parking();
275    let mut server = TestServer::start(&deterministic).await;
276    let client_a = server.create_client(cx_a, "user_a").await;
277
278    let channel_id = server
279        .make_channel("the-channel", (&client_a, cx_a), &mut [])
280        .await;
281
282    let channel_buffer_1 = client_a
283        .channel_store()
284        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
285    let channel_buffer_2 = client_a
286        .channel_store()
287        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
288    let channel_buffer_3 = client_a
289        .channel_store()
290        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
291
292    // All concurrent tasks for opening a channel buffer return the same model handle.
293    let (channel_buffer, channel_buffer_2, channel_buffer_3) =
294        future::try_join3(channel_buffer_1, channel_buffer_2, channel_buffer_3)
295            .await
296            .unwrap();
297    let channel_buffer_model_id = channel_buffer.id();
298    assert_eq!(channel_buffer, channel_buffer_2);
299    assert_eq!(channel_buffer, channel_buffer_3);
300
301    channel_buffer.update(cx_a, |buffer, cx| {
302        buffer.buffer().update(cx, |buffer, cx| {
303            buffer.edit([(0..0, "hello")], None, cx);
304        })
305    });
306    deterministic.run_until_parked();
307
308    cx_a.update(|_| {
309        drop(channel_buffer);
310        drop(channel_buffer_2);
311        drop(channel_buffer_3);
312    });
313    deterministic.run_until_parked();
314
315    // The channel buffer can be reopened after dropping it.
316    let channel_buffer = client_a
317        .channel_store()
318        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
319        .await
320        .unwrap();
321    assert_ne!(channel_buffer.id(), channel_buffer_model_id);
322    channel_buffer.update(cx_a, |buffer, cx| {
323        buffer.buffer().update(cx, |buffer, _| {
324            assert_eq!(buffer.text(), "hello");
325        })
326    });
327}
328
329#[gpui::test]
330async fn test_channel_buffer_disconnect(
331    deterministic: Arc<Deterministic>,
332    cx_a: &mut TestAppContext,
333    cx_b: &mut TestAppContext,
334) {
335    deterministic.forbid_parking();
336    let mut server = TestServer::start(&deterministic).await;
337    let client_a = server.create_client(cx_a, "user_a").await;
338    let client_b = server.create_client(cx_b, "user_b").await;
339
340    let channel_id = server
341        .make_channel("the-channel", (&client_a, cx_a), &mut [(&client_b, cx_b)])
342        .await;
343
344    let channel_buffer_a = client_a
345        .channel_store()
346        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
347        .await
348        .unwrap();
349    let channel_buffer_b = client_b
350        .channel_store()
351        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
352        .await
353        .unwrap();
354
355    server.forbid_connections();
356    server.disconnect_client(client_a.peer_id().unwrap());
357    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
358
359    channel_buffer_a.update(cx_a, |buffer, _| {
360        assert_eq!(
361            buffer.channel().as_ref(),
362            &Channel {
363                id: channel_id,
364                name: "the-channel".to_string()
365            }
366        );
367        assert!(!buffer.is_connected());
368    });
369
370    deterministic.run_until_parked();
371
372    server.allow_connections();
373    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
374
375    deterministic.run_until_parked();
376
377    client_a
378        .channel_store()
379        .update(cx_a, |channel_store, _| {
380            channel_store.remove_channel(channel_id)
381        })
382        .await
383        .unwrap();
384    deterministic.run_until_parked();
385
386    // Channel buffer observed the deletion
387    channel_buffer_b.update(cx_b, |buffer, _| {
388        assert_eq!(
389            buffer.channel().as_ref(),
390            &Channel {
391                id: channel_id,
392                name: "the-channel".to_string()
393            }
394        );
395        assert!(!buffer.is_connected());
396    });
397}
398
399#[gpui::test]
400async fn test_rejoin_channel_buffer(
401    deterministic: Arc<Deterministic>,
402    cx_a: &mut TestAppContext,
403    cx_b: &mut TestAppContext,
404) {
405    deterministic.forbid_parking();
406    let mut server = TestServer::start(&deterministic).await;
407    let client_a = server.create_client(cx_a, "user_a").await;
408    let client_b = server.create_client(cx_b, "user_b").await;
409
410    let channel_id = server
411        .make_channel("the-channel", (&client_a, cx_a), &mut [(&client_b, cx_b)])
412        .await;
413
414    let channel_buffer_a = client_a
415        .channel_store()
416        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
417        .await
418        .unwrap();
419    let channel_buffer_b = client_b
420        .channel_store()
421        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
422        .await
423        .unwrap();
424
425    channel_buffer_a.update(cx_a, |buffer, cx| {
426        buffer.buffer().update(cx, |buffer, cx| {
427            buffer.edit([(0..0, "1")], None, cx);
428        })
429    });
430    deterministic.run_until_parked();
431
432    // Client A disconnects.
433    server.forbid_connections();
434    server.disconnect_client(client_a.peer_id().unwrap());
435    // deterministic.advance_clock(RECEIVE_TIMEOUT);
436
437    // Both clients make an edit. Both clients see their own edit.
438    channel_buffer_a.update(cx_a, |buffer, cx| {
439        buffer.buffer().update(cx, |buffer, cx| {
440            buffer.edit([(1..1, "2")], None, cx);
441        })
442    });
443    channel_buffer_b.update(cx_b, |buffer, cx| {
444        buffer.buffer().update(cx, |buffer, cx| {
445            buffer.edit([(0..0, "0")], None, cx);
446        })
447    });
448    deterministic.run_until_parked();
449    channel_buffer_a.read_with(cx_a, |buffer, cx| {
450        assert_eq!(buffer.buffer().read(cx).text(), "12");
451    });
452    channel_buffer_b.read_with(cx_b, |buffer, cx| {
453        assert_eq!(buffer.buffer().read(cx).text(), "01");
454    });
455
456    // Client A reconnects.
457    server.allow_connections();
458    deterministic.advance_clock(RECEIVE_TIMEOUT);
459    channel_buffer_a.read_with(cx_a, |buffer, cx| {
460        assert_eq!(buffer.buffer().read(cx).text(), "012");
461    });
462    channel_buffer_b.read_with(cx_b, |buffer, cx| {
463        assert_eq!(buffer.buffer().read(cx).text(), "012");
464    });
465}
466
467#[track_caller]
468fn assert_collaborators(collaborators: &[proto::Collaborator], ids: &[Option<UserId>]) {
469    assert_eq!(
470        collaborators
471            .into_iter()
472            .map(|collaborator| collaborator.user_id)
473            .collect::<Vec<_>>(),
474        ids.into_iter().map(|id| id.unwrap()).collect::<Vec<_>>()
475    );
476}
477
478fn buffer_text(channel_buffer: &ModelHandle<language::Buffer>, cx: &mut TestAppContext) -> String {
479    channel_buffer.read_with(cx, |buffer, _| buffer.text())
480}