channel_buffer_tests.rs

  1use crate::{rpc::RECONNECT_TIMEOUT, tests::TestServer};
  2use call::ActiveCall;
  3use client::UserId;
  4use collab_ui::channel_view::ChannelView;
  5use collections::HashMap;
  6use gpui::{executor::Deterministic, ModelHandle, TestAppContext};
  7use rpc::{proto, RECEIVE_TIMEOUT};
  8use serde_json::json;
  9use std::sync::Arc;
 10
 11#[gpui::test]
 12async fn test_core_channel_buffers(
 13    deterministic: Arc<Deterministic>,
 14    cx_a: &mut TestAppContext,
 15    cx_b: &mut TestAppContext,
 16) {
 17    deterministic.forbid_parking();
 18    let mut server = TestServer::start(&deterministic).await;
 19    let client_a = server.create_client(cx_a, "user_a").await;
 20    let client_b = server.create_client(cx_b, "user_b").await;
 21
 22    let zed_id = server
 23        .make_channel("zed", (&client_a, cx_a), &mut [(&client_b, cx_b)])
 24        .await;
 25
 26    // Client A joins the channel buffer
 27    let channel_buffer_a = client_a
 28        .channel_store()
 29        .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx))
 30        .await
 31        .unwrap();
 32
 33    // Client A edits the buffer
 34    let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
 35
 36    buffer_a.update(cx_a, |buffer, cx| {
 37        buffer.edit([(0..0, "hello world")], None, cx)
 38    });
 39    buffer_a.update(cx_a, |buffer, cx| {
 40        buffer.edit([(5..5, ", cruel")], None, cx)
 41    });
 42    buffer_a.update(cx_a, |buffer, cx| {
 43        buffer.edit([(0..5, "goodbye")], None, cx)
 44    });
 45    buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
 46    deterministic.run_until_parked();
 47
 48    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
 49
 50    // Client B joins the channel buffer
 51    let channel_buffer_b = client_b
 52        .channel_store()
 53        .update(cx_b, |channel, cx| channel.open_channel_buffer(zed_id, cx))
 54        .await
 55        .unwrap();
 56
 57    channel_buffer_b.read_with(cx_b, |buffer, _| {
 58        assert_collaborators(
 59            buffer.collaborators(),
 60            &[client_a.user_id(), client_b.user_id()],
 61        );
 62    });
 63
 64    // Client B sees the correct text, and then edits it
 65    let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
 66    assert_eq!(
 67        buffer_b.read_with(cx_b, |buffer, _| buffer.remote_id()),
 68        buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id())
 69    );
 70    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
 71    buffer_b.update(cx_b, |buffer, cx| {
 72        buffer.edit([(7..12, "beautiful")], None, cx)
 73    });
 74
 75    // Both A and B see the new edit
 76    deterministic.run_until_parked();
 77    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
 78    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
 79
 80    // Client A closes the channel buffer.
 81    cx_a.update(|_| drop(channel_buffer_a));
 82    deterministic.run_until_parked();
 83
 84    // Client B sees that client A is gone from the channel buffer.
 85    channel_buffer_b.read_with(cx_b, |buffer, _| {
 86        assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
 87    });
 88
 89    // Client A rejoins the channel buffer
 90    let _channel_buffer_a = client_a
 91        .channel_store()
 92        .update(cx_a, |channels, cx| {
 93            channels.open_channel_buffer(zed_id, cx)
 94        })
 95        .await
 96        .unwrap();
 97    deterministic.run_until_parked();
 98
 99    // Sanity test, make sure we saw A rejoining
100    channel_buffer_b.read_with(cx_b, |buffer, _| {
101        assert_collaborators(
102            &buffer.collaborators(),
103            &[client_b.user_id(), client_a.user_id()],
104        );
105    });
106
107    // Client A loses connection.
108    server.forbid_connections();
109    server.disconnect_client(client_a.peer_id().unwrap());
110    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
111
112    // Client B observes A disconnect
113    channel_buffer_b.read_with(cx_b, |buffer, _| {
114        assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
115    });
116
117    // TODO:
118    // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
119    // - Test interaction with channel deletion while buffer is open
120}
121
122#[gpui::test]
123async fn test_channel_buffer_replica_ids(
124    deterministic: Arc<Deterministic>,
125    cx_a: &mut TestAppContext,
126    cx_b: &mut TestAppContext,
127    cx_c: &mut TestAppContext,
128) {
129    deterministic.forbid_parking();
130    let mut server = TestServer::start(&deterministic).await;
131    let client_a = server.create_client(cx_a, "user_a").await;
132    let client_b = server.create_client(cx_b, "user_b").await;
133    let client_c = server.create_client(cx_c, "user_c").await;
134
135    let channel_id = server
136        .make_channel(
137            "zed",
138            (&client_a, cx_a),
139            &mut [(&client_b, cx_b), (&client_c, cx_c)],
140        )
141        .await;
142
143    let active_call_a = cx_a.read(ActiveCall::global);
144    let active_call_b = cx_b.read(ActiveCall::global);
145    let active_call_c = cx_c.read(ActiveCall::global);
146
147    // Clients A and B join a channel.
148    active_call_a
149        .update(cx_a, |call, cx| call.join_channel(channel_id, cx))
150        .await
151        .unwrap();
152    active_call_b
153        .update(cx_b, |call, cx| call.join_channel(channel_id, cx))
154        .await
155        .unwrap();
156
157    // Clients A, B, and C join a channel buffer
158    // C first so that the replica IDs in the project and the channel buffer are different
159    let channel_buffer_c = client_c
160        .channel_store()
161        .update(cx_c, |channel, cx| {
162            channel.open_channel_buffer(channel_id, cx)
163        })
164        .await
165        .unwrap();
166    let channel_buffer_b = client_b
167        .channel_store()
168        .update(cx_b, |channel, cx| {
169            channel.open_channel_buffer(channel_id, cx)
170        })
171        .await
172        .unwrap();
173    let channel_buffer_a = client_a
174        .channel_store()
175        .update(cx_a, |channel, cx| {
176            channel.open_channel_buffer(channel_id, cx)
177        })
178        .await
179        .unwrap();
180
181    // Client B shares a project
182    client_b
183        .fs()
184        .insert_tree("/dir", json!({ "file.txt": "contents" }))
185        .await;
186    let (project_b, _) = client_b.build_local_project("/dir", cx_b).await;
187    let shared_project_id = active_call_b
188        .update(cx_b, |call, cx| call.share_project(project_b.clone(), cx))
189        .await
190        .unwrap();
191
192    // Client A joins the project
193    let project_a = client_a.build_remote_project(shared_project_id, cx_a).await;
194    deterministic.run_until_parked();
195
196    // Client C is in a separate project.
197    client_c.fs().insert_tree("/dir", json!({})).await;
198    let (separate_project_c, _) = client_c.build_local_project("/dir", cx_c).await;
199
200    // Note that each user has a different replica id in the projects vs the
201    // channel buffer.
202    channel_buffer_a.read_with(cx_a, |channel_buffer, cx| {
203        assert_eq!(project_a.read(cx).replica_id(), 1);
204        assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 2);
205    });
206    channel_buffer_b.read_with(cx_b, |channel_buffer, cx| {
207        assert_eq!(project_b.read(cx).replica_id(), 0);
208        assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 1);
209    });
210    channel_buffer_c.read_with(cx_c, |channel_buffer, cx| {
211        // C is not in the project
212        assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 0);
213    });
214
215    let channel_window_a = cx_a
216        .add_window(|cx| ChannelView::new(project_a.clone(), channel_buffer_a.clone(), None, cx));
217    let channel_window_b = cx_b
218        .add_window(|cx| ChannelView::new(project_b.clone(), channel_buffer_b.clone(), None, cx));
219    let channel_window_c = cx_c.add_window(|cx| {
220        ChannelView::new(
221            separate_project_c.clone(),
222            channel_buffer_c.clone(),
223            None,
224            cx,
225        )
226    });
227
228    let channel_view_a = channel_window_a.root(cx_a);
229    let channel_view_b = channel_window_b.root(cx_b);
230    let channel_view_c = channel_window_c.root(cx_c);
231
232    // For clients A and B, the replica ids in the channel buffer are mapped
233    // so that they match the same users' replica ids in their shared project.
234    channel_view_a.read_with(cx_a, |view, cx| {
235        assert_eq!(
236            view.editor.read(cx).replica_id_map().unwrap(),
237            &[(1, 0), (2, 1)].into_iter().collect::<HashMap<_, _>>()
238        );
239    });
240    channel_view_b.read_with(cx_b, |view, cx| {
241        assert_eq!(
242            view.editor.read(cx).replica_id_map().unwrap(),
243            &[(1, 0), (2, 1)].into_iter().collect::<HashMap<u16, u16>>(),
244        )
245    });
246
247    // Client C only sees themself, as they're not part of any shared project
248    channel_view_c.read_with(cx_c, |view, cx| {
249        assert_eq!(
250            view.editor.read(cx).replica_id_map().unwrap(),
251            &[(0, 0)].into_iter().collect::<HashMap<u16, u16>>(),
252        );
253    });
254
255    // Client C joins the project that clients A and B are in.
256    active_call_c
257        .update(cx_c, |call, cx| call.join_channel(channel_id, cx))
258        .await
259        .unwrap();
260    let project_c = client_c.build_remote_project(shared_project_id, cx_c).await;
261    deterministic.run_until_parked();
262    project_c.read_with(cx_c, |project, _| {
263        assert_eq!(project.replica_id(), 2);
264    });
265
266    // For clients A and B, client C's replica id in the channel buffer is
267    // now mapped to their replica id in the shared project.
268    channel_view_a.read_with(cx_a, |view, cx| {
269        assert_eq!(
270            view.editor.read(cx).replica_id_map().unwrap(),
271            &[(1, 0), (2, 1), (0, 2)]
272                .into_iter()
273                .collect::<HashMap<_, _>>()
274        );
275    });
276    channel_view_b.read_with(cx_b, |view, cx| {
277        assert_eq!(
278            view.editor.read(cx).replica_id_map().unwrap(),
279            &[(1, 0), (2, 1), (0, 2)]
280                .into_iter()
281                .collect::<HashMap<_, _>>(),
282        )
283    });
284}
285
286#[track_caller]
287fn assert_collaborators(collaborators: &[proto::Collaborator], ids: &[Option<UserId>]) {
288    assert_eq!(
289        collaborators
290            .into_iter()
291            .map(|collaborator| collaborator.user_id)
292            .collect::<Vec<_>>(),
293        ids.into_iter().map(|id| id.unwrap()).collect::<Vec<_>>()
294    );
295}
296
297fn buffer_text(channel_buffer: &ModelHandle<language::Buffer>, cx: &mut TestAppContext) -> String {
298    channel_buffer.read_with(cx, |buffer, _| buffer.text())
299}