1use crate::{
2 rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT},
3 tests::TestServer,
4};
5use call::ActiveCall;
6use channel::Channel;
7use client::UserId;
8use collab_ui::channel_view::ChannelView;
9use collections::HashMap;
10use futures::future;
11use gpui::{executor::Deterministic, ModelHandle, TestAppContext};
12use rpc::{proto, RECEIVE_TIMEOUT};
13use serde_json::json;
14use std::sync::Arc;
15
16#[gpui::test]
17async fn test_core_channel_buffers(
18 deterministic: Arc<Deterministic>,
19 cx_a: &mut TestAppContext,
20 cx_b: &mut TestAppContext,
21) {
22 deterministic.forbid_parking();
23 let mut server = TestServer::start(&deterministic).await;
24 let client_a = server.create_client(cx_a, "user_a").await;
25 let client_b = server.create_client(cx_b, "user_b").await;
26
27 let channel_id = server
28 .make_channel("zed", (&client_a, cx_a), &mut [(&client_b, cx_b)])
29 .await;
30
31 // Client A joins the channel buffer
32 let channel_buffer_a = client_a
33 .channel_store()
34 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
35 .await
36 .unwrap();
37
38 // Client A edits the buffer
39 let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
40 buffer_a.update(cx_a, |buffer, cx| {
41 buffer.edit([(0..0, "hello world")], None, cx)
42 });
43 buffer_a.update(cx_a, |buffer, cx| {
44 buffer.edit([(5..5, ", cruel")], None, cx)
45 });
46 buffer_a.update(cx_a, |buffer, cx| {
47 buffer.edit([(0..5, "goodbye")], None, cx)
48 });
49 buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
50 assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
51 deterministic.run_until_parked();
52
53 // Client B joins the channel buffer
54 let channel_buffer_b = client_b
55 .channel_store()
56 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
57 .await
58 .unwrap();
59 channel_buffer_b.read_with(cx_b, |buffer, _| {
60 assert_collaborators(
61 buffer.collaborators(),
62 &[client_a.user_id(), client_b.user_id()],
63 );
64 });
65
66 // Client B sees the correct text, and then edits it
67 let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
68 assert_eq!(
69 buffer_b.read_with(cx_b, |buffer, _| buffer.remote_id()),
70 buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id())
71 );
72 assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
73 buffer_b.update(cx_b, |buffer, cx| {
74 buffer.edit([(7..12, "beautiful")], None, cx)
75 });
76
77 // Both A and B see the new edit
78 deterministic.run_until_parked();
79 assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
80 assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
81
82 // Client A closes the channel buffer.
83 cx_a.update(|_| drop(channel_buffer_a));
84 deterministic.run_until_parked();
85
86 // Client B sees that client A is gone from the channel buffer.
87 channel_buffer_b.read_with(cx_b, |buffer, _| {
88 assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
89 });
90
91 // Client A rejoins the channel buffer
92 let _channel_buffer_a = client_a
93 .channel_store()
94 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
95 .await
96 .unwrap();
97 deterministic.run_until_parked();
98
99 // Sanity test, make sure we saw A rejoining
100 channel_buffer_b.read_with(cx_b, |buffer, _| {
101 assert_collaborators(
102 &buffer.collaborators(),
103 &[client_b.user_id(), client_a.user_id()],
104 );
105 });
106
107 // Client A loses connection.
108 server.forbid_connections();
109 server.disconnect_client(client_a.peer_id().unwrap());
110 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
111
112 // Client B observes A disconnect
113 channel_buffer_b.read_with(cx_b, |buffer, _| {
114 assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
115 });
116
117 // TODO:
118 // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
119 // - Test interaction with channel deletion while buffer is open
120}
121
122#[gpui::test]
123async fn test_channel_buffer_replica_ids(
124 deterministic: Arc<Deterministic>,
125 cx_a: &mut TestAppContext,
126 cx_b: &mut TestAppContext,
127 cx_c: &mut TestAppContext,
128) {
129 deterministic.forbid_parking();
130 let mut server = TestServer::start(&deterministic).await;
131 let client_a = server.create_client(cx_a, "user_a").await;
132 let client_b = server.create_client(cx_b, "user_b").await;
133 let client_c = server.create_client(cx_c, "user_c").await;
134
135 let channel_id = server
136 .make_channel(
137 "the-channel",
138 (&client_a, cx_a),
139 &mut [(&client_b, cx_b), (&client_c, cx_c)],
140 )
141 .await;
142
143 let active_call_a = cx_a.read(ActiveCall::global);
144 let active_call_b = cx_b.read(ActiveCall::global);
145 let active_call_c = cx_c.read(ActiveCall::global);
146
147 // Clients A and B join a channel.
148 active_call_a
149 .update(cx_a, |call, cx| call.join_channel(channel_id, cx))
150 .await
151 .unwrap();
152 active_call_b
153 .update(cx_b, |call, cx| call.join_channel(channel_id, cx))
154 .await
155 .unwrap();
156
157 // Clients A, B, and C join a channel buffer
158 // C first so that the replica IDs in the project and the channel buffer are different
159 let channel_buffer_c = client_c
160 .channel_store()
161 .update(cx_c, |store, cx| store.open_channel_buffer(channel_id, cx))
162 .await
163 .unwrap();
164 let channel_buffer_b = client_b
165 .channel_store()
166 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
167 .await
168 .unwrap();
169 let channel_buffer_a = client_a
170 .channel_store()
171 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
172 .await
173 .unwrap();
174
175 // Client B shares a project
176 client_b
177 .fs()
178 .insert_tree("/dir", json!({ "file.txt": "contents" }))
179 .await;
180 let (project_b, _) = client_b.build_local_project("/dir", cx_b).await;
181 let shared_project_id = active_call_b
182 .update(cx_b, |call, cx| call.share_project(project_b.clone(), cx))
183 .await
184 .unwrap();
185
186 // Client A joins the project
187 let project_a = client_a.build_remote_project(shared_project_id, cx_a).await;
188 deterministic.run_until_parked();
189
190 // Client C is in a separate project.
191 client_c.fs().insert_tree("/dir", json!({})).await;
192 let (separate_project_c, _) = client_c.build_local_project("/dir", cx_c).await;
193
194 // Note that each user has a different replica id in the projects vs the
195 // channel buffer.
196 channel_buffer_a.read_with(cx_a, |channel_buffer, cx| {
197 assert_eq!(project_a.read(cx).replica_id(), 1);
198 assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 2);
199 });
200 channel_buffer_b.read_with(cx_b, |channel_buffer, cx| {
201 assert_eq!(project_b.read(cx).replica_id(), 0);
202 assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 1);
203 });
204 channel_buffer_c.read_with(cx_c, |channel_buffer, cx| {
205 // C is not in the project
206 assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 0);
207 });
208
209 let channel_window_a =
210 cx_a.add_window(|cx| ChannelView::new(project_a.clone(), channel_buffer_a.clone(), cx));
211 let channel_window_b =
212 cx_b.add_window(|cx| ChannelView::new(project_b.clone(), channel_buffer_b.clone(), cx));
213 let channel_window_c = cx_c.add_window(|cx| {
214 ChannelView::new(separate_project_c.clone(), channel_buffer_c.clone(), cx)
215 });
216
217 let channel_view_a = channel_window_a.root(cx_a);
218 let channel_view_b = channel_window_b.root(cx_b);
219 let channel_view_c = channel_window_c.root(cx_c);
220
221 // For clients A and B, the replica ids in the channel buffer are mapped
222 // so that they match the same users' replica ids in their shared project.
223 channel_view_a.read_with(cx_a, |view, cx| {
224 assert_eq!(
225 view.editor.read(cx).replica_id_map().unwrap(),
226 &[(1, 0), (2, 1)].into_iter().collect::<HashMap<_, _>>()
227 );
228 });
229 channel_view_b.read_with(cx_b, |view, cx| {
230 assert_eq!(
231 view.editor.read(cx).replica_id_map().unwrap(),
232 &[(1, 0), (2, 1)].into_iter().collect::<HashMap<u16, u16>>(),
233 )
234 });
235
236 // Client C only sees themself, as they're not part of any shared project
237 channel_view_c.read_with(cx_c, |view, cx| {
238 assert_eq!(
239 view.editor.read(cx).replica_id_map().unwrap(),
240 &[(0, 0)].into_iter().collect::<HashMap<u16, u16>>(),
241 );
242 });
243
244 // Client C joins the project that clients A and B are in.
245 active_call_c
246 .update(cx_c, |call, cx| call.join_channel(channel_id, cx))
247 .await
248 .unwrap();
249 let project_c = client_c.build_remote_project(shared_project_id, cx_c).await;
250 deterministic.run_until_parked();
251 project_c.read_with(cx_c, |project, _| {
252 assert_eq!(project.replica_id(), 2);
253 });
254
255 // For clients A and B, client C's replica id in the channel buffer is
256 // now mapped to their replica id in the shared project.
257 channel_view_a.read_with(cx_a, |view, cx| {
258 assert_eq!(
259 view.editor.read(cx).replica_id_map().unwrap(),
260 &[(1, 0), (2, 1), (0, 2)]
261 .into_iter()
262 .collect::<HashMap<_, _>>()
263 );
264 });
265 channel_view_b.read_with(cx_b, |view, cx| {
266 assert_eq!(
267 view.editor.read(cx).replica_id_map().unwrap(),
268 &[(1, 0), (2, 1), (0, 2)]
269 .into_iter()
270 .collect::<HashMap<_, _>>(),
271 )
272 });
273}
274
275#[gpui::test]
276async fn test_reopen_channel_buffer(deterministic: Arc<Deterministic>, cx_a: &mut TestAppContext) {
277 deterministic.forbid_parking();
278 let mut server = TestServer::start(&deterministic).await;
279 let client_a = server.create_client(cx_a, "user_a").await;
280
281 let channel_id = server
282 .make_channel("the-channel", (&client_a, cx_a), &mut [])
283 .await;
284
285 let channel_buffer_1 = client_a
286 .channel_store()
287 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
288 let channel_buffer_2 = client_a
289 .channel_store()
290 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
291 let channel_buffer_3 = client_a
292 .channel_store()
293 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
294
295 // All concurrent tasks for opening a channel buffer return the same model handle.
296 let (channel_buffer, channel_buffer_2, channel_buffer_3) =
297 future::try_join3(channel_buffer_1, channel_buffer_2, channel_buffer_3)
298 .await
299 .unwrap();
300 let channel_buffer_model_id = channel_buffer.id();
301 assert_eq!(channel_buffer, channel_buffer_2);
302 assert_eq!(channel_buffer, channel_buffer_3);
303
304 channel_buffer.update(cx_a, |buffer, cx| {
305 buffer.buffer().update(cx, |buffer, cx| {
306 buffer.edit([(0..0, "hello")], None, cx);
307 })
308 });
309 deterministic.run_until_parked();
310
311 cx_a.update(|_| {
312 drop(channel_buffer);
313 drop(channel_buffer_2);
314 drop(channel_buffer_3);
315 });
316 deterministic.run_until_parked();
317
318 // The channel buffer can be reopened after dropping it.
319 let channel_buffer = client_a
320 .channel_store()
321 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
322 .await
323 .unwrap();
324 assert_ne!(channel_buffer.id(), channel_buffer_model_id);
325 channel_buffer.update(cx_a, |buffer, cx| {
326 buffer.buffer().update(cx, |buffer, _| {
327 assert_eq!(buffer.text(), "hello");
328 })
329 });
330}
331
332#[gpui::test]
333async fn test_channel_buffer_disconnect(
334 deterministic: Arc<Deterministic>,
335 cx_a: &mut TestAppContext,
336 cx_b: &mut TestAppContext,
337) {
338 deterministic.forbid_parking();
339 let mut server = TestServer::start(&deterministic).await;
340 let client_a = server.create_client(cx_a, "user_a").await;
341 let client_b = server.create_client(cx_b, "user_b").await;
342
343 let channel_id = server
344 .make_channel("the-channel", (&client_a, cx_a), &mut [(&client_b, cx_b)])
345 .await;
346
347 let channel_buffer_a = client_a
348 .channel_store()
349 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
350 .await
351 .unwrap();
352 let channel_buffer_b = client_b
353 .channel_store()
354 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
355 .await
356 .unwrap();
357
358 server.forbid_connections();
359 server.disconnect_client(client_a.peer_id().unwrap());
360 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
361
362 channel_buffer_a.update(cx_a, |buffer, _| {
363 assert_eq!(
364 buffer.channel().as_ref(),
365 &Channel {
366 id: channel_id,
367 name: "the-channel".to_string()
368 }
369 );
370 assert!(!buffer.is_connected());
371 });
372
373 deterministic.run_until_parked();
374
375 server.allow_connections();
376 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
377
378 deterministic.run_until_parked();
379
380 client_a
381 .channel_store()
382 .update(cx_a, |channel_store, _| {
383 channel_store.remove_channel(channel_id)
384 })
385 .await
386 .unwrap();
387 deterministic.run_until_parked();
388
389 // Channel buffer observed the deletion
390 channel_buffer_b.update(cx_b, |buffer, _| {
391 assert_eq!(
392 buffer.channel().as_ref(),
393 &Channel {
394 id: channel_id,
395 name: "the-channel".to_string()
396 }
397 );
398 assert!(!buffer.is_connected());
399 });
400}
401
402#[gpui::test]
403async fn test_rejoin_channel_buffer(
404 deterministic: Arc<Deterministic>,
405 cx_a: &mut TestAppContext,
406 cx_b: &mut TestAppContext,
407) {
408 deterministic.forbid_parking();
409 let mut server = TestServer::start(&deterministic).await;
410 let client_a = server.create_client(cx_a, "user_a").await;
411 let client_b = server.create_client(cx_b, "user_b").await;
412
413 let channel_id = server
414 .make_channel("the-channel", (&client_a, cx_a), &mut [(&client_b, cx_b)])
415 .await;
416
417 let channel_buffer_a = client_a
418 .channel_store()
419 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
420 .await
421 .unwrap();
422 let channel_buffer_b = client_b
423 .channel_store()
424 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
425 .await
426 .unwrap();
427
428 channel_buffer_a.update(cx_a, |buffer, cx| {
429 buffer.buffer().update(cx, |buffer, cx| {
430 buffer.edit([(0..0, "1")], None, cx);
431 })
432 });
433 deterministic.run_until_parked();
434
435 // Client A disconnects.
436 server.forbid_connections();
437 server.disconnect_client(client_a.peer_id().unwrap());
438
439 // Both clients make an edit.
440 channel_buffer_a.update(cx_a, |buffer, cx| {
441 buffer.buffer().update(cx, |buffer, cx| {
442 buffer.edit([(1..1, "2")], None, cx);
443 })
444 });
445 channel_buffer_b.update(cx_b, |buffer, cx| {
446 buffer.buffer().update(cx, |buffer, cx| {
447 buffer.edit([(0..0, "0")], None, cx);
448 })
449 });
450
451 // Both clients see their own edit.
452 deterministic.run_until_parked();
453 channel_buffer_a.read_with(cx_a, |buffer, cx| {
454 assert_eq!(buffer.buffer().read(cx).text(), "12");
455 });
456 channel_buffer_b.read_with(cx_b, |buffer, cx| {
457 assert_eq!(buffer.buffer().read(cx).text(), "01");
458 });
459
460 // Client A reconnects. Both clients see each other's edits, and see
461 // the same collaborators.
462 server.allow_connections();
463 deterministic.advance_clock(RECEIVE_TIMEOUT);
464 channel_buffer_a.read_with(cx_a, |buffer, cx| {
465 assert_eq!(buffer.buffer().read(cx).text(), "012");
466 });
467 channel_buffer_b.read_with(cx_b, |buffer, cx| {
468 assert_eq!(buffer.buffer().read(cx).text(), "012");
469 });
470
471 channel_buffer_a.read_with(cx_a, |buffer_a, _| {
472 channel_buffer_b.read_with(cx_b, |buffer_b, _| {
473 assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
474 });
475 });
476}
477
478#[gpui::test]
479async fn test_channel_buffers_and_server_restarts(
480 deterministic: Arc<Deterministic>,
481 cx_a: &mut TestAppContext,
482 cx_b: &mut TestAppContext,
483 cx_c: &mut TestAppContext,
484) {
485 deterministic.forbid_parking();
486 let mut server = TestServer::start(&deterministic).await;
487 let client_a = server.create_client(cx_a, "user_a").await;
488 let client_b = server.create_client(cx_b, "user_b").await;
489 let client_c = server.create_client(cx_c, "user_c").await;
490
491 let channel_id = server
492 .make_channel(
493 "the-channel",
494 (&client_a, cx_a),
495 &mut [(&client_b, cx_b), (&client_c, cx_c)],
496 )
497 .await;
498
499 let channel_buffer_a = client_a
500 .channel_store()
501 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
502 .await
503 .unwrap();
504 let channel_buffer_b = client_b
505 .channel_store()
506 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
507 .await
508 .unwrap();
509 let _channel_buffer_c = client_c
510 .channel_store()
511 .update(cx_c, |store, cx| store.open_channel_buffer(channel_id, cx))
512 .await
513 .unwrap();
514
515 channel_buffer_a.update(cx_a, |buffer, cx| {
516 buffer.buffer().update(cx, |buffer, cx| {
517 buffer.edit([(0..0, "1")], None, cx);
518 })
519 });
520 deterministic.run_until_parked();
521
522 // Client C can't reconnect.
523 client_c.override_establish_connection(|_, cx| cx.spawn(|_| future::pending()));
524
525 // Server stops.
526 server.reset().await;
527 deterministic.advance_clock(RECEIVE_TIMEOUT);
528
529 // While the server is down, both clients make an edit.
530 channel_buffer_a.update(cx_a, |buffer, cx| {
531 buffer.buffer().update(cx, |buffer, cx| {
532 buffer.edit([(1..1, "2")], None, cx);
533 })
534 });
535 channel_buffer_b.update(cx_b, |buffer, cx| {
536 buffer.buffer().update(cx, |buffer, cx| {
537 buffer.edit([(0..0, "0")], None, cx);
538 })
539 });
540
541 // Server restarts.
542 server.start().await.unwrap();
543 deterministic.advance_clock(CLEANUP_TIMEOUT);
544
545 // Clients reconnects. Clients A and B see each other's edits, and see
546 // that client C has disconnected.
547 channel_buffer_a.read_with(cx_a, |buffer, cx| {
548 assert_eq!(buffer.buffer().read(cx).text(), "012");
549 });
550 channel_buffer_b.read_with(cx_b, |buffer, cx| {
551 assert_eq!(buffer.buffer().read(cx).text(), "012");
552 });
553
554 channel_buffer_a.read_with(cx_a, |buffer_a, _| {
555 channel_buffer_b.read_with(cx_b, |buffer_b, _| {
556 assert_eq!(
557 buffer_a
558 .collaborators()
559 .iter()
560 .map(|c| c.user_id)
561 .collect::<Vec<_>>(),
562 vec![client_a.user_id().unwrap(), client_b.user_id().unwrap()]
563 );
564 assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
565 });
566 });
567}
568
569#[track_caller]
570fn assert_collaborators(collaborators: &[proto::Collaborator], ids: &[Option<UserId>]) {
571 assert_eq!(
572 collaborators
573 .into_iter()
574 .map(|collaborator| collaborator.user_id)
575 .collect::<Vec<_>>(),
576 ids.into_iter().map(|id| id.unwrap()).collect::<Vec<_>>()
577 );
578}
579
580fn buffer_text(channel_buffer: &ModelHandle<language::Buffer>, cx: &mut TestAppContext) -> String {
581 channel_buffer.read_with(cx, |buffer, _| buffer.text())
582}