channel_buffer_tests.rs

  1use crate::{rpc::RECONNECT_TIMEOUT, tests::TestServer};
  2use call::ActiveCall;
  3use channel::Channel;
  4use client::UserId;
  5use collab_ui::channel_view::ChannelView;
  6use collections::HashMap;
  7use futures::future;
  8use gpui::{executor::Deterministic, ModelHandle, TestAppContext};
  9use rpc::{proto, RECEIVE_TIMEOUT};
 10use serde_json::json;
 11use std::sync::Arc;
 12
 13#[gpui::test]
 14async fn test_core_channel_buffers(
 15    deterministic: Arc<Deterministic>,
 16    cx_a: &mut TestAppContext,
 17    cx_b: &mut TestAppContext,
 18) {
 19    deterministic.forbid_parking();
 20    let mut server = TestServer::start(&deterministic).await;
 21    let client_a = server.create_client(cx_a, "user_a").await;
 22    let client_b = server.create_client(cx_b, "user_b").await;
 23
 24    let zed_id = server
 25        .make_channel("zed", (&client_a, cx_a), &mut [(&client_b, cx_b)])
 26        .await;
 27
 28    // Client A joins the channel buffer
 29    let channel_buffer_a = client_a
 30        .channel_store()
 31        .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx))
 32        .await
 33        .unwrap();
 34
 35    // Client A edits the buffer
 36    let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
 37
 38    buffer_a.update(cx_a, |buffer, cx| {
 39        buffer.edit([(0..0, "hello world")], None, cx)
 40    });
 41    buffer_a.update(cx_a, |buffer, cx| {
 42        buffer.edit([(5..5, ", cruel")], None, cx)
 43    });
 44    buffer_a.update(cx_a, |buffer, cx| {
 45        buffer.edit([(0..5, "goodbye")], None, cx)
 46    });
 47    buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
 48    deterministic.run_until_parked();
 49
 50    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
 51
 52    // Client B joins the channel buffer
 53    let channel_buffer_b = client_b
 54        .channel_store()
 55        .update(cx_b, |channel, cx| channel.open_channel_buffer(zed_id, cx))
 56        .await
 57        .unwrap();
 58
 59    channel_buffer_b.read_with(cx_b, |buffer, _| {
 60        assert_collaborators(
 61            buffer.collaborators(),
 62            &[client_a.user_id(), client_b.user_id()],
 63        );
 64    });
 65
 66    // Client B sees the correct text, and then edits it
 67    let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
 68    assert_eq!(
 69        buffer_b.read_with(cx_b, |buffer, _| buffer.remote_id()),
 70        buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id())
 71    );
 72    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
 73    buffer_b.update(cx_b, |buffer, cx| {
 74        buffer.edit([(7..12, "beautiful")], None, cx)
 75    });
 76
 77    // Both A and B see the new edit
 78    deterministic.run_until_parked();
 79    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
 80    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
 81
 82    // Client A closes the channel buffer.
 83    cx_a.update(|_| drop(channel_buffer_a));
 84    deterministic.run_until_parked();
 85
 86    // Client B sees that client A is gone from the channel buffer.
 87    channel_buffer_b.read_with(cx_b, |buffer, _| {
 88        assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
 89    });
 90
 91    // Client A rejoins the channel buffer
 92    let _channel_buffer_a = client_a
 93        .channel_store()
 94        .update(cx_a, |channels, cx| {
 95            channels.open_channel_buffer(zed_id, cx)
 96        })
 97        .await
 98        .unwrap();
 99    deterministic.run_until_parked();
100
101    // Sanity test, make sure we saw A rejoining
102    channel_buffer_b.read_with(cx_b, |buffer, _| {
103        assert_collaborators(
104            &buffer.collaborators(),
105            &[client_b.user_id(), client_a.user_id()],
106        );
107    });
108
109    // Client A loses connection.
110    server.forbid_connections();
111    server.disconnect_client(client_a.peer_id().unwrap());
112    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
113
114    // Client B observes A disconnect
115    channel_buffer_b.read_with(cx_b, |buffer, _| {
116        assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
117    });
118
119    // TODO:
120    // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
121    // - Test interaction with channel deletion while buffer is open
122}
123
124#[gpui::test]
125async fn test_channel_buffer_replica_ids(
126    deterministic: Arc<Deterministic>,
127    cx_a: &mut TestAppContext,
128    cx_b: &mut TestAppContext,
129    cx_c: &mut TestAppContext,
130) {
131    deterministic.forbid_parking();
132    let mut server = TestServer::start(&deterministic).await;
133    let client_a = server.create_client(cx_a, "user_a").await;
134    let client_b = server.create_client(cx_b, "user_b").await;
135    let client_c = server.create_client(cx_c, "user_c").await;
136
137    let channel_id = server
138        .make_channel(
139            "zed",
140            (&client_a, cx_a),
141            &mut [(&client_b, cx_b), (&client_c, cx_c)],
142        )
143        .await;
144
145    let active_call_a = cx_a.read(ActiveCall::global);
146    let active_call_b = cx_b.read(ActiveCall::global);
147    let active_call_c = cx_c.read(ActiveCall::global);
148
149    // Clients A and B join a channel.
150    active_call_a
151        .update(cx_a, |call, cx| call.join_channel(channel_id, cx))
152        .await
153        .unwrap();
154    active_call_b
155        .update(cx_b, |call, cx| call.join_channel(channel_id, cx))
156        .await
157        .unwrap();
158
159    // Clients A, B, and C join a channel buffer
160    // C first so that the replica IDs in the project and the channel buffer are different
161    let channel_buffer_c = client_c
162        .channel_store()
163        .update(cx_c, |channel, cx| {
164            channel.open_channel_buffer(channel_id, cx)
165        })
166        .await
167        .unwrap();
168    let channel_buffer_b = client_b
169        .channel_store()
170        .update(cx_b, |channel, cx| {
171            channel.open_channel_buffer(channel_id, cx)
172        })
173        .await
174        .unwrap();
175    let channel_buffer_a = client_a
176        .channel_store()
177        .update(cx_a, |channel, cx| {
178            channel.open_channel_buffer(channel_id, cx)
179        })
180        .await
181        .unwrap();
182
183    // Client B shares a project
184    client_b
185        .fs()
186        .insert_tree("/dir", json!({ "file.txt": "contents" }))
187        .await;
188    let (project_b, _) = client_b.build_local_project("/dir", cx_b).await;
189    let shared_project_id = active_call_b
190        .update(cx_b, |call, cx| call.share_project(project_b.clone(), cx))
191        .await
192        .unwrap();
193
194    // Client A joins the project
195    let project_a = client_a.build_remote_project(shared_project_id, cx_a).await;
196    deterministic.run_until_parked();
197
198    // Client C is in a separate project.
199    client_c.fs().insert_tree("/dir", json!({})).await;
200    let (separate_project_c, _) = client_c.build_local_project("/dir", cx_c).await;
201
202    // Note that each user has a different replica id in the projects vs the
203    // channel buffer.
204    channel_buffer_a.read_with(cx_a, |channel_buffer, cx| {
205        assert_eq!(project_a.read(cx).replica_id(), 1);
206        assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 2);
207    });
208    channel_buffer_b.read_with(cx_b, |channel_buffer, cx| {
209        assert_eq!(project_b.read(cx).replica_id(), 0);
210        assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 1);
211    });
212    channel_buffer_c.read_with(cx_c, |channel_buffer, cx| {
213        // C is not in the project
214        assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 0);
215    });
216
217    let channel_window_a =
218        cx_a.add_window(|cx| ChannelView::new(project_a.clone(), channel_buffer_a.clone(), cx));
219    let channel_window_b =
220        cx_b.add_window(|cx| ChannelView::new(project_b.clone(), channel_buffer_b.clone(), cx));
221    let channel_window_c = cx_c.add_window(|cx| {
222        ChannelView::new(separate_project_c.clone(), channel_buffer_c.clone(), cx)
223    });
224
225    let channel_view_a = channel_window_a.root(cx_a);
226    let channel_view_b = channel_window_b.root(cx_b);
227    let channel_view_c = channel_window_c.root(cx_c);
228
229    // For clients A and B, the replica ids in the channel buffer are mapped
230    // so that they match the same users' replica ids in their shared project.
231    channel_view_a.read_with(cx_a, |view, cx| {
232        assert_eq!(
233            view.editor.read(cx).replica_id_map().unwrap(),
234            &[(1, 0), (2, 1)].into_iter().collect::<HashMap<_, _>>()
235        );
236    });
237    channel_view_b.read_with(cx_b, |view, cx| {
238        assert_eq!(
239            view.editor.read(cx).replica_id_map().unwrap(),
240            &[(1, 0), (2, 1)].into_iter().collect::<HashMap<u16, u16>>(),
241        )
242    });
243
244    // Client C only sees themself, as they're not part of any shared project
245    channel_view_c.read_with(cx_c, |view, cx| {
246        assert_eq!(
247            view.editor.read(cx).replica_id_map().unwrap(),
248            &[(0, 0)].into_iter().collect::<HashMap<u16, u16>>(),
249        );
250    });
251
252    // Client C joins the project that clients A and B are in.
253    active_call_c
254        .update(cx_c, |call, cx| call.join_channel(channel_id, cx))
255        .await
256        .unwrap();
257    let project_c = client_c.build_remote_project(shared_project_id, cx_c).await;
258    deterministic.run_until_parked();
259    project_c.read_with(cx_c, |project, _| {
260        assert_eq!(project.replica_id(), 2);
261    });
262
263    // For clients A and B, client C's replica id in the channel buffer is
264    // now mapped to their replica id in the shared project.
265    channel_view_a.read_with(cx_a, |view, cx| {
266        assert_eq!(
267            view.editor.read(cx).replica_id_map().unwrap(),
268            &[(1, 0), (2, 1), (0, 2)]
269                .into_iter()
270                .collect::<HashMap<_, _>>()
271        );
272    });
273    channel_view_b.read_with(cx_b, |view, cx| {
274        assert_eq!(
275            view.editor.read(cx).replica_id_map().unwrap(),
276            &[(1, 0), (2, 1), (0, 2)]
277                .into_iter()
278                .collect::<HashMap<_, _>>(),
279        )
280    });
281}
282
283#[gpui::test]
284async fn test_reopen_channel_buffer(deterministic: Arc<Deterministic>, cx_a: &mut TestAppContext) {
285    deterministic.forbid_parking();
286    let mut server = TestServer::start(&deterministic).await;
287    let client_a = server.create_client(cx_a, "user_a").await;
288
289    let zed_id = server.make_channel("zed", (&client_a, cx_a), &mut []).await;
290
291    let channel_buffer_1 = client_a
292        .channel_store()
293        .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx));
294    let channel_buffer_2 = client_a
295        .channel_store()
296        .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx));
297    let channel_buffer_3 = client_a
298        .channel_store()
299        .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx));
300
301    // All concurrent tasks for opening a channel buffer return the same model handle.
302    let (channel_buffer_1, channel_buffer_2, channel_buffer_3) =
303        future::try_join3(channel_buffer_1, channel_buffer_2, channel_buffer_3)
304            .await
305            .unwrap();
306    let model_id = channel_buffer_1.id();
307    assert_eq!(channel_buffer_1, channel_buffer_2);
308    assert_eq!(channel_buffer_1, channel_buffer_3);
309
310    channel_buffer_1.update(cx_a, |buffer, cx| {
311        buffer.buffer().update(cx, |buffer, cx| {
312            buffer.edit([(0..0, "hello")], None, cx);
313        })
314    });
315    deterministic.run_until_parked();
316
317    cx_a.update(|_| {
318        drop(channel_buffer_1);
319        drop(channel_buffer_2);
320        drop(channel_buffer_3);
321    });
322    deterministic.run_until_parked();
323
324    // The channel buffer can be reopened after dropping it.
325    let channel_buffer = client_a
326        .channel_store()
327        .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx))
328        .await
329        .unwrap();
330    assert_ne!(channel_buffer.id(), model_id);
331    channel_buffer.update(cx_a, |buffer, cx| {
332        buffer.buffer().update(cx, |buffer, _| {
333            assert_eq!(buffer.text(), "hello");
334        })
335    });
336}
337
338#[gpui::test]
339async fn test_channel_buffer_disconnect(
340    deterministic: Arc<Deterministic>,
341    cx_a: &mut TestAppContext,
342    cx_b: &mut TestAppContext,
343) {
344    deterministic.forbid_parking();
345    let mut server = TestServer::start(&deterministic).await;
346    let client_a = server.create_client(cx_a, "user_a").await;
347    let client_b = server.create_client(cx_b, "user_b").await;
348
349    let channel_id = server
350        .make_channel("zed", (&client_a, cx_a), &mut [(&client_b, cx_b)])
351        .await;
352
353    let channel_buffer_a = client_a
354        .channel_store()
355        .update(cx_a, |channel, cx| {
356            channel.open_channel_buffer(channel_id, cx)
357        })
358        .await
359        .unwrap();
360
361    let channel_buffer_b = client_b
362        .channel_store()
363        .update(cx_b, |channel, cx| {
364            channel.open_channel_buffer(channel_id, cx)
365        })
366        .await
367        .unwrap();
368
369    server.forbid_connections();
370    server.disconnect_client(client_a.peer_id().unwrap());
371    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
372
373    channel_buffer_a.update(cx_a, |buffer, _| {
374        assert_eq!(
375            buffer.channel().as_ref(),
376            &Channel {
377                id: channel_id,
378                name: "zed".to_string()
379            }
380        );
381        assert!(!buffer.is_connected());
382    });
383
384    deterministic.run_until_parked();
385
386    server.allow_connections();
387    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
388
389    deterministic.run_until_parked();
390
391    client_a
392        .channel_store()
393        .update(cx_a, |channel_store, _| {
394            channel_store.remove_channel(channel_id)
395        })
396        .await
397        .unwrap();
398    deterministic.run_until_parked();
399
400    // Channel buffer observed the deletion
401    channel_buffer_b.update(cx_b, |buffer, _| {
402        assert_eq!(
403            buffer.channel().as_ref(),
404            &Channel {
405                id: channel_id,
406                name: "zed".to_string()
407            }
408        );
409        assert!(!buffer.is_connected());
410    });
411}
412
413#[track_caller]
414fn assert_collaborators(collaborators: &[proto::Collaborator], ids: &[Option<UserId>]) {
415    assert_eq!(
416        collaborators
417            .into_iter()
418            .map(|collaborator| collaborator.user_id)
419            .collect::<Vec<_>>(),
420        ids.into_iter().map(|id| id.unwrap()).collect::<Vec<_>>()
421    );
422}
423
424fn buffer_text(channel_buffer: &ModelHandle<language::Buffer>, cx: &mut TestAppContext) -> String {
425    channel_buffer.read_with(cx, |buffer, _| buffer.text())
426}