channel_buffer_tests.rs

  1use crate::{rpc::RECONNECT_TIMEOUT, tests::TestServer};
  2use call::ActiveCall;
  3use client::UserId;
  4use collab_ui::channel_view::ChannelView;
  5use collections::HashMap;
  6use futures::future;
  7use gpui::{executor::Deterministic, ModelHandle, TestAppContext};
  8use rpc::{proto, RECEIVE_TIMEOUT};
  9use serde_json::json;
 10use std::sync::Arc;
 11
 12#[gpui::test]
 13async fn test_core_channel_buffers(
 14    deterministic: Arc<Deterministic>,
 15    cx_a: &mut TestAppContext,
 16    cx_b: &mut TestAppContext,
 17) {
 18    deterministic.forbid_parking();
 19    let mut server = TestServer::start(&deterministic).await;
 20    let client_a = server.create_client(cx_a, "user_a").await;
 21    let client_b = server.create_client(cx_b, "user_b").await;
 22
 23    let zed_id = server
 24        .make_channel("zed", (&client_a, cx_a), &mut [(&client_b, cx_b)])
 25        .await;
 26
 27    // Client A joins the channel buffer
 28    let channel_buffer_a = client_a
 29        .channel_store()
 30        .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx))
 31        .await
 32        .unwrap();
 33
 34    // Client A edits the buffer
 35    let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
 36
 37    buffer_a.update(cx_a, |buffer, cx| {
 38        buffer.edit([(0..0, "hello world")], None, cx)
 39    });
 40    buffer_a.update(cx_a, |buffer, cx| {
 41        buffer.edit([(5..5, ", cruel")], None, cx)
 42    });
 43    buffer_a.update(cx_a, |buffer, cx| {
 44        buffer.edit([(0..5, "goodbye")], None, cx)
 45    });
 46    buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
 47    deterministic.run_until_parked();
 48
 49    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
 50
 51    // Client B joins the channel buffer
 52    let channel_buffer_b = client_b
 53        .channel_store()
 54        .update(cx_b, |channel, cx| channel.open_channel_buffer(zed_id, cx))
 55        .await
 56        .unwrap();
 57
 58    channel_buffer_b.read_with(cx_b, |buffer, _| {
 59        assert_collaborators(
 60            buffer.collaborators(),
 61            &[client_a.user_id(), client_b.user_id()],
 62        );
 63    });
 64
 65    // Client B sees the correct text, and then edits it
 66    let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
 67    assert_eq!(
 68        buffer_b.read_with(cx_b, |buffer, _| buffer.remote_id()),
 69        buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id())
 70    );
 71    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
 72    buffer_b.update(cx_b, |buffer, cx| {
 73        buffer.edit([(7..12, "beautiful")], None, cx)
 74    });
 75
 76    // Both A and B see the new edit
 77    deterministic.run_until_parked();
 78    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
 79    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
 80
 81    // Client A closes the channel buffer.
 82    cx_a.update(|_| drop(channel_buffer_a));
 83    deterministic.run_until_parked();
 84
 85    // Client B sees that client A is gone from the channel buffer.
 86    channel_buffer_b.read_with(cx_b, |buffer, _| {
 87        assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
 88    });
 89
 90    // Client A rejoins the channel buffer
 91    let _channel_buffer_a = client_a
 92        .channel_store()
 93        .update(cx_a, |channels, cx| {
 94            channels.open_channel_buffer(zed_id, cx)
 95        })
 96        .await
 97        .unwrap();
 98    deterministic.run_until_parked();
 99
100    // Sanity test, make sure we saw A rejoining
101    channel_buffer_b.read_with(cx_b, |buffer, _| {
102        assert_collaborators(
103            &buffer.collaborators(),
104            &[client_b.user_id(), client_a.user_id()],
105        );
106    });
107
108    // Client A loses connection.
109    server.forbid_connections();
110    server.disconnect_client(client_a.peer_id().unwrap());
111    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
112
113    // Client B observes A disconnect
114    channel_buffer_b.read_with(cx_b, |buffer, _| {
115        assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
116    });
117
118    // TODO:
119    // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
120    // - Test interaction with channel deletion while buffer is open
121}
122
123#[gpui::test]
124async fn test_channel_buffer_replica_ids(
125    deterministic: Arc<Deterministic>,
126    cx_a: &mut TestAppContext,
127    cx_b: &mut TestAppContext,
128    cx_c: &mut TestAppContext,
129) {
130    deterministic.forbid_parking();
131    let mut server = TestServer::start(&deterministic).await;
132    let client_a = server.create_client(cx_a, "user_a").await;
133    let client_b = server.create_client(cx_b, "user_b").await;
134    let client_c = server.create_client(cx_c, "user_c").await;
135
136    let channel_id = server
137        .make_channel(
138            "zed",
139            (&client_a, cx_a),
140            &mut [(&client_b, cx_b), (&client_c, cx_c)],
141        )
142        .await;
143
144    let active_call_a = cx_a.read(ActiveCall::global);
145    let active_call_b = cx_b.read(ActiveCall::global);
146    let active_call_c = cx_c.read(ActiveCall::global);
147
148    // Clients A and B join a channel.
149    active_call_a
150        .update(cx_a, |call, cx| call.join_channel(channel_id, cx))
151        .await
152        .unwrap();
153    active_call_b
154        .update(cx_b, |call, cx| call.join_channel(channel_id, cx))
155        .await
156        .unwrap();
157
158    // Clients A, B, and C join a channel buffer
159    // C first so that the replica IDs in the project and the channel buffer are different
160    let channel_buffer_c = client_c
161        .channel_store()
162        .update(cx_c, |channel, cx| {
163            channel.open_channel_buffer(channel_id, cx)
164        })
165        .await
166        .unwrap();
167    let channel_buffer_b = client_b
168        .channel_store()
169        .update(cx_b, |channel, cx| {
170            channel.open_channel_buffer(channel_id, cx)
171        })
172        .await
173        .unwrap();
174    let channel_buffer_a = client_a
175        .channel_store()
176        .update(cx_a, |channel, cx| {
177            channel.open_channel_buffer(channel_id, cx)
178        })
179        .await
180        .unwrap();
181
182    // Client B shares a project
183    client_b
184        .fs()
185        .insert_tree("/dir", json!({ "file.txt": "contents" }))
186        .await;
187    let (project_b, _) = client_b.build_local_project("/dir", cx_b).await;
188    let shared_project_id = active_call_b
189        .update(cx_b, |call, cx| call.share_project(project_b.clone(), cx))
190        .await
191        .unwrap();
192
193    // Client A joins the project
194    let project_a = client_a.build_remote_project(shared_project_id, cx_a).await;
195    deterministic.run_until_parked();
196
197    // Client C is in a separate project.
198    client_c.fs().insert_tree("/dir", json!({})).await;
199    let (separate_project_c, _) = client_c.build_local_project("/dir", cx_c).await;
200
201    // Note that each user has a different replica id in the projects vs the
202    // channel buffer.
203    channel_buffer_a.read_with(cx_a, |channel_buffer, cx| {
204        assert_eq!(project_a.read(cx).replica_id(), 1);
205        assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 2);
206    });
207    channel_buffer_b.read_with(cx_b, |channel_buffer, cx| {
208        assert_eq!(project_b.read(cx).replica_id(), 0);
209        assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 1);
210    });
211    channel_buffer_c.read_with(cx_c, |channel_buffer, cx| {
212        // C is not in the project
213        assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 0);
214    });
215
216    let channel_window_a =
217        cx_a.add_window(|cx| ChannelView::new(project_a.clone(), channel_buffer_a.clone(), cx));
218    let channel_window_b =
219        cx_b.add_window(|cx| ChannelView::new(project_b.clone(), channel_buffer_b.clone(), cx));
220    let channel_window_c = cx_c.add_window(|cx| {
221        ChannelView::new(separate_project_c.clone(), channel_buffer_c.clone(), cx)
222    });
223
224    let channel_view_a = channel_window_a.root(cx_a);
225    let channel_view_b = channel_window_b.root(cx_b);
226    let channel_view_c = channel_window_c.root(cx_c);
227
228    // For clients A and B, the replica ids in the channel buffer are mapped
229    // so that they match the same users' replica ids in their shared project.
230    channel_view_a.read_with(cx_a, |view, cx| {
231        assert_eq!(
232            view.editor.read(cx).replica_id_map().unwrap(),
233            &[(1, 0), (2, 1)].into_iter().collect::<HashMap<_, _>>()
234        );
235    });
236    channel_view_b.read_with(cx_b, |view, cx| {
237        assert_eq!(
238            view.editor.read(cx).replica_id_map().unwrap(),
239            &[(1, 0), (2, 1)].into_iter().collect::<HashMap<u16, u16>>(),
240        )
241    });
242
243    // Client C only sees themself, as they're not part of any shared project
244    channel_view_c.read_with(cx_c, |view, cx| {
245        assert_eq!(
246            view.editor.read(cx).replica_id_map().unwrap(),
247            &[(0, 0)].into_iter().collect::<HashMap<u16, u16>>(),
248        );
249    });
250
251    // Client C joins the project that clients A and B are in.
252    active_call_c
253        .update(cx_c, |call, cx| call.join_channel(channel_id, cx))
254        .await
255        .unwrap();
256    let project_c = client_c.build_remote_project(shared_project_id, cx_c).await;
257    deterministic.run_until_parked();
258    project_c.read_with(cx_c, |project, _| {
259        assert_eq!(project.replica_id(), 2);
260    });
261
262    // For clients A and B, client C's replica id in the channel buffer is
263    // now mapped to their replica id in the shared project.
264    channel_view_a.read_with(cx_a, |view, cx| {
265        assert_eq!(
266            view.editor.read(cx).replica_id_map().unwrap(),
267            &[(1, 0), (2, 1), (0, 2)]
268                .into_iter()
269                .collect::<HashMap<_, _>>()
270        );
271    });
272    channel_view_b.read_with(cx_b, |view, cx| {
273        assert_eq!(
274            view.editor.read(cx).replica_id_map().unwrap(),
275            &[(1, 0), (2, 1), (0, 2)]
276                .into_iter()
277                .collect::<HashMap<_, _>>(),
278        )
279    });
280}
281
282#[gpui::test]
283async fn test_reopen_channel_buffer(deterministic: Arc<Deterministic>, cx_a: &mut TestAppContext) {
284    deterministic.forbid_parking();
285    let mut server = TestServer::start(&deterministic).await;
286    let client_a = server.create_client(cx_a, "user_a").await;
287
288    let zed_id = server.make_channel("zed", (&client_a, cx_a), &mut []).await;
289
290    let channel_buffer_1 = client_a
291        .channel_store()
292        .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx));
293    let channel_buffer_2 = client_a
294        .channel_store()
295        .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx));
296    let channel_buffer_3 = client_a
297        .channel_store()
298        .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx));
299
300    // All concurrent tasks for opening a channel buffer return the same model handle.
301    let (channel_buffer_1, channel_buffer_2, channel_buffer_3) =
302        future::try_join3(channel_buffer_1, channel_buffer_2, channel_buffer_3)
303            .await
304            .unwrap();
305    let model_id = channel_buffer_1.id();
306    assert_eq!(channel_buffer_1, channel_buffer_2);
307    assert_eq!(channel_buffer_1, channel_buffer_3);
308
309    channel_buffer_1.update(cx_a, |buffer, cx| {
310        buffer.buffer().update(cx, |buffer, cx| {
311            buffer.edit([(0..0, "hello")], None, cx);
312        })
313    });
314    deterministic.run_until_parked();
315
316    cx_a.update(|_| {
317        drop(channel_buffer_1);
318        drop(channel_buffer_2);
319        drop(channel_buffer_3);
320    });
321    deterministic.run_until_parked();
322
323    // The channel buffer can be reopened after dropping it.
324    let channel_buffer = client_a
325        .channel_store()
326        .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx))
327        .await
328        .unwrap();
329    assert_ne!(channel_buffer.id(), model_id);
330    channel_buffer.update(cx_a, |buffer, cx| {
331        buffer.buffer().update(cx, |buffer, _| {
332            assert_eq!(buffer.text(), "hello");
333        })
334    });
335}
336
337#[track_caller]
338fn assert_collaborators(collaborators: &[proto::Collaborator], ids: &[Option<UserId>]) {
339    assert_eq!(
340        collaborators
341            .into_iter()
342            .map(|collaborator| collaborator.user_id)
343            .collect::<Vec<_>>(),
344        ids.into_iter().map(|id| id.unwrap()).collect::<Vec<_>>()
345    );
346}
347
348fn buffer_text(channel_buffer: &ModelHandle<language::Buffer>, cx: &mut TestAppContext) -> String {
349    channel_buffer.read_with(cx, |buffer, _| buffer.text())
350}