1use crate::{rpc::RECONNECT_TIMEOUT, tests::TestServer};
2use call::ActiveCall;
3use channel::Channel;
4use client::UserId;
5use collab_ui::channel_view::ChannelView;
6use collections::HashMap;
7use futures::future;
8use gpui::{executor::Deterministic, ModelHandle, TestAppContext};
9use rpc::{proto, RECEIVE_TIMEOUT};
10use serde_json::json;
11use std::sync::Arc;
12
13#[gpui::test]
14async fn test_core_channel_buffers(
15 deterministic: Arc<Deterministic>,
16 cx_a: &mut TestAppContext,
17 cx_b: &mut TestAppContext,
18) {
19 deterministic.forbid_parking();
20 let mut server = TestServer::start(&deterministic).await;
21 let client_a = server.create_client(cx_a, "user_a").await;
22 let client_b = server.create_client(cx_b, "user_b").await;
23
24 let zed_id = server
25 .make_channel("zed", (&client_a, cx_a), &mut [(&client_b, cx_b)])
26 .await;
27
28 // Client A joins the channel buffer
29 let channel_buffer_a = client_a
30 .channel_store()
31 .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx))
32 .await
33 .unwrap();
34
35 // Client A edits the buffer
36 let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
37
38 buffer_a.update(cx_a, |buffer, cx| {
39 buffer.edit([(0..0, "hello world")], None, cx)
40 });
41 buffer_a.update(cx_a, |buffer, cx| {
42 buffer.edit([(5..5, ", cruel")], None, cx)
43 });
44 buffer_a.update(cx_a, |buffer, cx| {
45 buffer.edit([(0..5, "goodbye")], None, cx)
46 });
47 buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
48 deterministic.run_until_parked();
49
50 assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
51
52 // Client B joins the channel buffer
53 let channel_buffer_b = client_b
54 .channel_store()
55 .update(cx_b, |channel, cx| channel.open_channel_buffer(zed_id, cx))
56 .await
57 .unwrap();
58
59 channel_buffer_b.read_with(cx_b, |buffer, _| {
60 assert_collaborators(
61 buffer.collaborators(),
62 &[client_a.user_id(), client_b.user_id()],
63 );
64 });
65
66 // Client B sees the correct text, and then edits it
67 let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
68 assert_eq!(
69 buffer_b.read_with(cx_b, |buffer, _| buffer.remote_id()),
70 buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id())
71 );
72 assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
73 buffer_b.update(cx_b, |buffer, cx| {
74 buffer.edit([(7..12, "beautiful")], None, cx)
75 });
76
77 // Both A and B see the new edit
78 deterministic.run_until_parked();
79 assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
80 assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
81
82 // Client A closes the channel buffer.
83 cx_a.update(|_| drop(channel_buffer_a));
84 deterministic.run_until_parked();
85
86 // Client B sees that client A is gone from the channel buffer.
87 channel_buffer_b.read_with(cx_b, |buffer, _| {
88 assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
89 });
90
91 // Client A rejoins the channel buffer
92 let _channel_buffer_a = client_a
93 .channel_store()
94 .update(cx_a, |channels, cx| {
95 channels.open_channel_buffer(zed_id, cx)
96 })
97 .await
98 .unwrap();
99 deterministic.run_until_parked();
100
101 // Sanity test, make sure we saw A rejoining
102 channel_buffer_b.read_with(cx_b, |buffer, _| {
103 assert_collaborators(
104 &buffer.collaborators(),
105 &[client_b.user_id(), client_a.user_id()],
106 );
107 });
108
109 // Client A loses connection.
110 server.forbid_connections();
111 server.disconnect_client(client_a.peer_id().unwrap());
112 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
113
114 // Client B observes A disconnect
115 channel_buffer_b.read_with(cx_b, |buffer, _| {
116 assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
117 });
118
119 // TODO:
120 // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
121 // - Test interaction with channel deletion while buffer is open
122}
123
124#[gpui::test]
125async fn test_channel_buffer_replica_ids(
126 deterministic: Arc<Deterministic>,
127 cx_a: &mut TestAppContext,
128 cx_b: &mut TestAppContext,
129 cx_c: &mut TestAppContext,
130) {
131 deterministic.forbid_parking();
132 let mut server = TestServer::start(&deterministic).await;
133 let client_a = server.create_client(cx_a, "user_a").await;
134 let client_b = server.create_client(cx_b, "user_b").await;
135 let client_c = server.create_client(cx_c, "user_c").await;
136
137 let channel_id = server
138 .make_channel(
139 "zed",
140 (&client_a, cx_a),
141 &mut [(&client_b, cx_b), (&client_c, cx_c)],
142 )
143 .await;
144
145 let active_call_a = cx_a.read(ActiveCall::global);
146 let active_call_b = cx_b.read(ActiveCall::global);
147 let active_call_c = cx_c.read(ActiveCall::global);
148
149 // Clients A and B join a channel.
150 active_call_a
151 .update(cx_a, |call, cx| call.join_channel(channel_id, cx))
152 .await
153 .unwrap();
154 active_call_b
155 .update(cx_b, |call, cx| call.join_channel(channel_id, cx))
156 .await
157 .unwrap();
158
159 // Clients A, B, and C join a channel buffer
160 // C first so that the replica IDs in the project and the channel buffer are different
161 let channel_buffer_c = client_c
162 .channel_store()
163 .update(cx_c, |channel, cx| {
164 channel.open_channel_buffer(channel_id, cx)
165 })
166 .await
167 .unwrap();
168 let channel_buffer_b = client_b
169 .channel_store()
170 .update(cx_b, |channel, cx| {
171 channel.open_channel_buffer(channel_id, cx)
172 })
173 .await
174 .unwrap();
175 let channel_buffer_a = client_a
176 .channel_store()
177 .update(cx_a, |channel, cx| {
178 channel.open_channel_buffer(channel_id, cx)
179 })
180 .await
181 .unwrap();
182
183 // Client B shares a project
184 client_b
185 .fs()
186 .insert_tree("/dir", json!({ "file.txt": "contents" }))
187 .await;
188 let (project_b, _) = client_b.build_local_project("/dir", cx_b).await;
189 let shared_project_id = active_call_b
190 .update(cx_b, |call, cx| call.share_project(project_b.clone(), cx))
191 .await
192 .unwrap();
193
194 // Client A joins the project
195 let project_a = client_a.build_remote_project(shared_project_id, cx_a).await;
196 deterministic.run_until_parked();
197
198 // Client C is in a separate project.
199 client_c.fs().insert_tree("/dir", json!({})).await;
200 let (separate_project_c, _) = client_c.build_local_project("/dir", cx_c).await;
201
202 // Note that each user has a different replica id in the projects vs the
203 // channel buffer.
204 channel_buffer_a.read_with(cx_a, |channel_buffer, cx| {
205 assert_eq!(project_a.read(cx).replica_id(), 1);
206 assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 2);
207 });
208 channel_buffer_b.read_with(cx_b, |channel_buffer, cx| {
209 assert_eq!(project_b.read(cx).replica_id(), 0);
210 assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 1);
211 });
212 channel_buffer_c.read_with(cx_c, |channel_buffer, cx| {
213 // C is not in the project
214 assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 0);
215 });
216
217 let channel_window_a =
218 cx_a.add_window(|cx| ChannelView::new(project_a.clone(), channel_buffer_a.clone(), cx));
219 let channel_window_b =
220 cx_b.add_window(|cx| ChannelView::new(project_b.clone(), channel_buffer_b.clone(), cx));
221 let channel_window_c = cx_c.add_window(|cx| {
222 ChannelView::new(separate_project_c.clone(), channel_buffer_c.clone(), cx)
223 });
224
225 let channel_view_a = channel_window_a.root(cx_a);
226 let channel_view_b = channel_window_b.root(cx_b);
227 let channel_view_c = channel_window_c.root(cx_c);
228
229 // For clients A and B, the replica ids in the channel buffer are mapped
230 // so that they match the same users' replica ids in their shared project.
231 channel_view_a.read_with(cx_a, |view, cx| {
232 assert_eq!(
233 view.editor.read(cx).replica_id_map().unwrap(),
234 &[(1, 0), (2, 1)].into_iter().collect::<HashMap<_, _>>()
235 );
236 });
237 channel_view_b.read_with(cx_b, |view, cx| {
238 assert_eq!(
239 view.editor.read(cx).replica_id_map().unwrap(),
240 &[(1, 0), (2, 1)].into_iter().collect::<HashMap<u16, u16>>(),
241 )
242 });
243
244 // Client C only sees themself, as they're not part of any shared project
245 channel_view_c.read_with(cx_c, |view, cx| {
246 assert_eq!(
247 view.editor.read(cx).replica_id_map().unwrap(),
248 &[(0, 0)].into_iter().collect::<HashMap<u16, u16>>(),
249 );
250 });
251
252 // Client C joins the project that clients A and B are in.
253 active_call_c
254 .update(cx_c, |call, cx| call.join_channel(channel_id, cx))
255 .await
256 .unwrap();
257 let project_c = client_c.build_remote_project(shared_project_id, cx_c).await;
258 deterministic.run_until_parked();
259 project_c.read_with(cx_c, |project, _| {
260 assert_eq!(project.replica_id(), 2);
261 });
262
263 // For clients A and B, client C's replica id in the channel buffer is
264 // now mapped to their replica id in the shared project.
265 channel_view_a.read_with(cx_a, |view, cx| {
266 assert_eq!(
267 view.editor.read(cx).replica_id_map().unwrap(),
268 &[(1, 0), (2, 1), (0, 2)]
269 .into_iter()
270 .collect::<HashMap<_, _>>()
271 );
272 });
273 channel_view_b.read_with(cx_b, |view, cx| {
274 assert_eq!(
275 view.editor.read(cx).replica_id_map().unwrap(),
276 &[(1, 0), (2, 1), (0, 2)]
277 .into_iter()
278 .collect::<HashMap<_, _>>(),
279 )
280 });
281}
282
283#[gpui::test]
284async fn test_reopen_channel_buffer(deterministic: Arc<Deterministic>, cx_a: &mut TestAppContext) {
285 deterministic.forbid_parking();
286 let mut server = TestServer::start(&deterministic).await;
287 let client_a = server.create_client(cx_a, "user_a").await;
288
289 let zed_id = server.make_channel("zed", (&client_a, cx_a), &mut []).await;
290
291 let channel_buffer_1 = client_a
292 .channel_store()
293 .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx));
294 let channel_buffer_2 = client_a
295 .channel_store()
296 .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx));
297 let channel_buffer_3 = client_a
298 .channel_store()
299 .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx));
300
301 // All concurrent tasks for opening a channel buffer return the same model handle.
302 let (channel_buffer_1, channel_buffer_2, channel_buffer_3) =
303 future::try_join3(channel_buffer_1, channel_buffer_2, channel_buffer_3)
304 .await
305 .unwrap();
306 let model_id = channel_buffer_1.id();
307 assert_eq!(channel_buffer_1, channel_buffer_2);
308 assert_eq!(channel_buffer_1, channel_buffer_3);
309
310 channel_buffer_1.update(cx_a, |buffer, cx| {
311 buffer.buffer().update(cx, |buffer, cx| {
312 buffer.edit([(0..0, "hello")], None, cx);
313 })
314 });
315 deterministic.run_until_parked();
316
317 cx_a.update(|_| {
318 drop(channel_buffer_1);
319 drop(channel_buffer_2);
320 drop(channel_buffer_3);
321 });
322 deterministic.run_until_parked();
323
324 // The channel buffer can be reopened after dropping it.
325 let channel_buffer = client_a
326 .channel_store()
327 .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx))
328 .await
329 .unwrap();
330 assert_ne!(channel_buffer.id(), model_id);
331 channel_buffer.update(cx_a, |buffer, cx| {
332 buffer.buffer().update(cx, |buffer, _| {
333 assert_eq!(buffer.text(), "hello");
334 })
335 });
336}
337
338#[gpui::test]
339async fn test_channel_buffer_disconnect(
340 deterministic: Arc<Deterministic>,
341 cx_a: &mut TestAppContext,
342 cx_b: &mut TestAppContext,
343) {
344 deterministic.forbid_parking();
345 let mut server = TestServer::start(&deterministic).await;
346 let client_a = server.create_client(cx_a, "user_a").await;
347 let client_b = server.create_client(cx_b, "user_b").await;
348
349 let channel_id = server
350 .make_channel("zed", (&client_a, cx_a), &mut [(&client_b, cx_b)])
351 .await;
352
353 let channel_buffer_a = client_a
354 .channel_store()
355 .update(cx_a, |channel, cx| {
356 channel.open_channel_buffer(channel_id, cx)
357 })
358 .await
359 .unwrap();
360
361 let channel_buffer_b = client_b
362 .channel_store()
363 .update(cx_b, |channel, cx| {
364 channel.open_channel_buffer(channel_id, cx)
365 })
366 .await
367 .unwrap();
368
369 server.forbid_connections();
370 server.disconnect_client(client_a.peer_id().unwrap());
371 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
372
373 channel_buffer_a.update(cx_a, |buffer, _| {
374 assert_eq!(
375 buffer.channel().as_ref(),
376 &Channel {
377 id: channel_id,
378 name: "zed".to_string()
379 }
380 );
381 assert!(!buffer.is_connected());
382 });
383
384 deterministic.run_until_parked();
385
386 server.allow_connections();
387 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
388
389 deterministic.run_until_parked();
390
391 client_a
392 .channel_store()
393 .update(cx_a, |channel_store, _| {
394 channel_store.remove_channel(channel_id)
395 })
396 .await
397 .unwrap();
398 deterministic.run_until_parked();
399
400 // Channel buffer observed the deletion
401 channel_buffer_b.update(cx_b, |buffer, _| {
402 assert_eq!(
403 buffer.channel().as_ref(),
404 &Channel {
405 id: channel_id,
406 name: "zed".to_string()
407 }
408 );
409 assert!(!buffer.is_connected());
410 });
411}
412
413#[track_caller]
414fn assert_collaborators(collaborators: &[proto::Collaborator], ids: &[Option<UserId>]) {
415 assert_eq!(
416 collaborators
417 .into_iter()
418 .map(|collaborator| collaborator.user_id)
419 .collect::<Vec<_>>(),
420 ids.into_iter().map(|id| id.unwrap()).collect::<Vec<_>>()
421 );
422}
423
424fn buffer_text(channel_buffer: &ModelHandle<language::Buffer>, cx: &mut TestAppContext) -> String {
425 channel_buffer.read_with(cx, |buffer, _| buffer.text())
426}