1use crate::{
2 rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT},
3 tests::TestServer,
4};
5use call::ActiveCall;
6use channel::Channel;
7use client::UserId;
8use collab_ui::channel_view::ChannelView;
9use collections::HashMap;
10use futures::future;
11use gpui::{executor::Deterministic, ModelHandle, TestAppContext};
12use rpc::{proto, RECEIVE_TIMEOUT};
13use serde_json::json;
14use std::sync::Arc;
15
16#[gpui::test]
17async fn test_core_channel_buffers(
18 deterministic: Arc<Deterministic>,
19 cx_a: &mut TestAppContext,
20 cx_b: &mut TestAppContext,
21) {
22 deterministic.forbid_parking();
23 let mut server = TestServer::start(&deterministic).await;
24 let client_a = server.create_client(cx_a, "user_a").await;
25 let client_b = server.create_client(cx_b, "user_b").await;
26
27 let channel_id = server
28 .make_channel("zed", None, (&client_a, cx_a), &mut [(&client_b, cx_b)])
29 .await;
30
31 // Client A joins the channel buffer
32 let channel_buffer_a = client_a
33 .channel_store()
34 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
35 .await
36 .unwrap();
37
38 // Client A edits the buffer
39 let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
40 buffer_a.update(cx_a, |buffer, cx| {
41 buffer.edit([(0..0, "hello world")], None, cx)
42 });
43 buffer_a.update(cx_a, |buffer, cx| {
44 buffer.edit([(5..5, ", cruel")], None, cx)
45 });
46 buffer_a.update(cx_a, |buffer, cx| {
47 buffer.edit([(0..5, "goodbye")], None, cx)
48 });
49 buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
50 assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
51 deterministic.run_until_parked();
52
53 // Client B joins the channel buffer
54 let channel_buffer_b = client_b
55 .channel_store()
56 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
57 .await
58 .unwrap();
59 channel_buffer_b.read_with(cx_b, |buffer, _| {
60 assert_collaborators(
61 buffer.collaborators(),
62 &[client_a.user_id(), client_b.user_id()],
63 );
64 });
65
66 // Client B sees the correct text, and then edits it
67 let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
68 assert_eq!(
69 buffer_b.read_with(cx_b, |buffer, _| buffer.remote_id()),
70 buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id())
71 );
72 assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
73 buffer_b.update(cx_b, |buffer, cx| {
74 buffer.edit([(7..12, "beautiful")], None, cx)
75 });
76
77 // Both A and B see the new edit
78 deterministic.run_until_parked();
79 assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
80 assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
81
82 // Client A closes the channel buffer.
83 cx_a.update(|_| drop(channel_buffer_a));
84 deterministic.run_until_parked();
85
86 // Client B sees that client A is gone from the channel buffer.
87 channel_buffer_b.read_with(cx_b, |buffer, _| {
88 assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
89 });
90
91 // Client A rejoins the channel buffer
92 let _channel_buffer_a = client_a
93 .channel_store()
94 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
95 .await
96 .unwrap();
97 deterministic.run_until_parked();
98
99 // Sanity test, make sure we saw A rejoining
100 channel_buffer_b.read_with(cx_b, |buffer, _| {
101 assert_collaborators(
102 &buffer.collaborators(),
103 &[client_b.user_id(), client_a.user_id()],
104 );
105 });
106
107 // Client A loses connection.
108 server.forbid_connections();
109 server.disconnect_client(client_a.peer_id().unwrap());
110 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
111
112 // Client B observes A disconnect
113 channel_buffer_b.read_with(cx_b, |buffer, _| {
114 assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
115 });
116
117 // TODO:
118 // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
119 // - Test interaction with channel deletion while buffer is open
120}
121
122#[gpui::test]
123async fn test_channel_buffer_replica_ids(
124 deterministic: Arc<Deterministic>,
125 cx_a: &mut TestAppContext,
126 cx_b: &mut TestAppContext,
127 cx_c: &mut TestAppContext,
128) {
129 deterministic.forbid_parking();
130 let mut server = TestServer::start(&deterministic).await;
131 let client_a = server.create_client(cx_a, "user_a").await;
132 let client_b = server.create_client(cx_b, "user_b").await;
133 let client_c = server.create_client(cx_c, "user_c").await;
134
135 let channel_id = server
136 .make_channel(
137 "the-channel",
138 None,
139 (&client_a, cx_a),
140 &mut [(&client_b, cx_b), (&client_c, cx_c)],
141 )
142 .await;
143
144 let active_call_a = cx_a.read(ActiveCall::global);
145 let active_call_b = cx_b.read(ActiveCall::global);
146 let active_call_c = cx_c.read(ActiveCall::global);
147
148 // Clients A and B join a channel.
149 active_call_a
150 .update(cx_a, |call, cx| call.join_channel(channel_id, cx))
151 .await
152 .unwrap();
153 active_call_b
154 .update(cx_b, |call, cx| call.join_channel(channel_id, cx))
155 .await
156 .unwrap();
157
158 // Clients A, B, and C join a channel buffer
159 // C first so that the replica IDs in the project and the channel buffer are different
160 let channel_buffer_c = client_c
161 .channel_store()
162 .update(cx_c, |store, cx| store.open_channel_buffer(channel_id, cx))
163 .await
164 .unwrap();
165 let channel_buffer_b = client_b
166 .channel_store()
167 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
168 .await
169 .unwrap();
170 let channel_buffer_a = client_a
171 .channel_store()
172 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
173 .await
174 .unwrap();
175
176 // Client B shares a project
177 client_b
178 .fs()
179 .insert_tree("/dir", json!({ "file.txt": "contents" }))
180 .await;
181 let (project_b, _) = client_b.build_local_project("/dir", cx_b).await;
182 let shared_project_id = active_call_b
183 .update(cx_b, |call, cx| call.share_project(project_b.clone(), cx))
184 .await
185 .unwrap();
186
187 // Client A joins the project
188 let project_a = client_a.build_remote_project(shared_project_id, cx_a).await;
189 deterministic.run_until_parked();
190
191 // Client C is in a separate project.
192 client_c.fs().insert_tree("/dir", json!({})).await;
193 let (separate_project_c, _) = client_c.build_local_project("/dir", cx_c).await;
194
195 // Note that each user has a different replica id in the projects vs the
196 // channel buffer.
197 channel_buffer_a.read_with(cx_a, |channel_buffer, cx| {
198 assert_eq!(project_a.read(cx).replica_id(), 1);
199 assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 2);
200 });
201 channel_buffer_b.read_with(cx_b, |channel_buffer, cx| {
202 assert_eq!(project_b.read(cx).replica_id(), 0);
203 assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 1);
204 });
205 channel_buffer_c.read_with(cx_c, |channel_buffer, cx| {
206 // C is not in the project
207 assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 0);
208 });
209
210 let channel_window_a =
211 cx_a.add_window(|cx| ChannelView::new(project_a.clone(), channel_buffer_a.clone(), cx));
212 let channel_window_b =
213 cx_b.add_window(|cx| ChannelView::new(project_b.clone(), channel_buffer_b.clone(), cx));
214 let channel_window_c = cx_c.add_window(|cx| {
215 ChannelView::new(separate_project_c.clone(), channel_buffer_c.clone(), cx)
216 });
217
218 let channel_view_a = channel_window_a.root(cx_a);
219 let channel_view_b = channel_window_b.root(cx_b);
220 let channel_view_c = channel_window_c.root(cx_c);
221
222 // For clients A and B, the replica ids in the channel buffer are mapped
223 // so that they match the same users' replica ids in their shared project.
224 channel_view_a.read_with(cx_a, |view, cx| {
225 assert_eq!(
226 view.editor.read(cx).replica_id_map().unwrap(),
227 &[(1, 0), (2, 1)].into_iter().collect::<HashMap<_, _>>()
228 );
229 });
230 channel_view_b.read_with(cx_b, |view, cx| {
231 assert_eq!(
232 view.editor.read(cx).replica_id_map().unwrap(),
233 &[(1, 0), (2, 1)].into_iter().collect::<HashMap<u16, u16>>(),
234 )
235 });
236
237 // Client C only sees themself, as they're not part of any shared project
238 channel_view_c.read_with(cx_c, |view, cx| {
239 assert_eq!(
240 view.editor.read(cx).replica_id_map().unwrap(),
241 &[(0, 0)].into_iter().collect::<HashMap<u16, u16>>(),
242 );
243 });
244
245 // Client C joins the project that clients A and B are in.
246 active_call_c
247 .update(cx_c, |call, cx| call.join_channel(channel_id, cx))
248 .await
249 .unwrap();
250 let project_c = client_c.build_remote_project(shared_project_id, cx_c).await;
251 deterministic.run_until_parked();
252 project_c.read_with(cx_c, |project, _| {
253 assert_eq!(project.replica_id(), 2);
254 });
255
256 // For clients A and B, client C's replica id in the channel buffer is
257 // now mapped to their replica id in the shared project.
258 channel_view_a.read_with(cx_a, |view, cx| {
259 assert_eq!(
260 view.editor.read(cx).replica_id_map().unwrap(),
261 &[(1, 0), (2, 1), (0, 2)]
262 .into_iter()
263 .collect::<HashMap<_, _>>()
264 );
265 });
266 channel_view_b.read_with(cx_b, |view, cx| {
267 assert_eq!(
268 view.editor.read(cx).replica_id_map().unwrap(),
269 &[(1, 0), (2, 1), (0, 2)]
270 .into_iter()
271 .collect::<HashMap<_, _>>(),
272 )
273 });
274}
275
276#[gpui::test]
277async fn test_reopen_channel_buffer(deterministic: Arc<Deterministic>, cx_a: &mut TestAppContext) {
278 deterministic.forbid_parking();
279 let mut server = TestServer::start(&deterministic).await;
280 let client_a = server.create_client(cx_a, "user_a").await;
281
282 let channel_id = server
283 .make_channel("the-channel", None, (&client_a, cx_a), &mut [])
284 .await;
285
286 let channel_buffer_1 = client_a
287 .channel_store()
288 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
289 let channel_buffer_2 = client_a
290 .channel_store()
291 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
292 let channel_buffer_3 = client_a
293 .channel_store()
294 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
295
296 // All concurrent tasks for opening a channel buffer return the same model handle.
297 let (channel_buffer, channel_buffer_2, channel_buffer_3) =
298 future::try_join3(channel_buffer_1, channel_buffer_2, channel_buffer_3)
299 .await
300 .unwrap();
301 let channel_buffer_model_id = channel_buffer.id();
302 assert_eq!(channel_buffer, channel_buffer_2);
303 assert_eq!(channel_buffer, channel_buffer_3);
304
305 channel_buffer.update(cx_a, |buffer, cx| {
306 buffer.buffer().update(cx, |buffer, cx| {
307 buffer.edit([(0..0, "hello")], None, cx);
308 })
309 });
310 deterministic.run_until_parked();
311
312 cx_a.update(|_| {
313 drop(channel_buffer);
314 drop(channel_buffer_2);
315 drop(channel_buffer_3);
316 });
317 deterministic.run_until_parked();
318
319 // The channel buffer can be reopened after dropping it.
320 let channel_buffer = client_a
321 .channel_store()
322 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
323 .await
324 .unwrap();
325 assert_ne!(channel_buffer.id(), channel_buffer_model_id);
326 channel_buffer.update(cx_a, |buffer, cx| {
327 buffer.buffer().update(cx, |buffer, _| {
328 assert_eq!(buffer.text(), "hello");
329 })
330 });
331}
332
333#[gpui::test]
334async fn test_channel_buffer_disconnect(
335 deterministic: Arc<Deterministic>,
336 cx_a: &mut TestAppContext,
337 cx_b: &mut TestAppContext,
338) {
339 deterministic.forbid_parking();
340 let mut server = TestServer::start(&deterministic).await;
341 let client_a = server.create_client(cx_a, "user_a").await;
342 let client_b = server.create_client(cx_b, "user_b").await;
343
344 let channel_id = server
345 .make_channel(
346 "the-channel",
347 None,
348 (&client_a, cx_a),
349 &mut [(&client_b, cx_b)],
350 )
351 .await;
352
353 let channel_buffer_a = client_a
354 .channel_store()
355 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
356 .await
357 .unwrap();
358 let channel_buffer_b = client_b
359 .channel_store()
360 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
361 .await
362 .unwrap();
363
364 server.forbid_connections();
365 server.disconnect_client(client_a.peer_id().unwrap());
366 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
367
368 channel_buffer_a.update(cx_a, |buffer, _| {
369 assert_eq!(
370 buffer.channel().as_ref(),
371 &Channel {
372 id: channel_id,
373 name: "the-channel".to_string()
374 }
375 );
376 assert!(!buffer.is_connected());
377 });
378
379 deterministic.run_until_parked();
380
381 server.allow_connections();
382 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
383
384 deterministic.run_until_parked();
385
386 client_a
387 .channel_store()
388 .update(cx_a, |channel_store, _| {
389 channel_store.remove_channel(channel_id)
390 })
391 .await
392 .unwrap();
393 deterministic.run_until_parked();
394
395 // Channel buffer observed the deletion
396 channel_buffer_b.update(cx_b, |buffer, _| {
397 assert_eq!(
398 buffer.channel().as_ref(),
399 &Channel {
400 id: channel_id,
401 name: "the-channel".to_string()
402 }
403 );
404 assert!(!buffer.is_connected());
405 });
406}
407
408#[gpui::test]
409async fn test_rejoin_channel_buffer(
410 deterministic: Arc<Deterministic>,
411 cx_a: &mut TestAppContext,
412 cx_b: &mut TestAppContext,
413) {
414 deterministic.forbid_parking();
415 let mut server = TestServer::start(&deterministic).await;
416 let client_a = server.create_client(cx_a, "user_a").await;
417 let client_b = server.create_client(cx_b, "user_b").await;
418
419 let channel_id = server
420 .make_channel(
421 "the-channel",
422 None,
423 (&client_a, cx_a),
424 &mut [(&client_b, cx_b)],
425 )
426 .await;
427
428 let channel_buffer_a = client_a
429 .channel_store()
430 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
431 .await
432 .unwrap();
433 let channel_buffer_b = client_b
434 .channel_store()
435 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
436 .await
437 .unwrap();
438
439 channel_buffer_a.update(cx_a, |buffer, cx| {
440 buffer.buffer().update(cx, |buffer, cx| {
441 buffer.edit([(0..0, "1")], None, cx);
442 })
443 });
444 deterministic.run_until_parked();
445
446 // Client A disconnects.
447 server.forbid_connections();
448 server.disconnect_client(client_a.peer_id().unwrap());
449
450 // Both clients make an edit.
451 channel_buffer_a.update(cx_a, |buffer, cx| {
452 buffer.buffer().update(cx, |buffer, cx| {
453 buffer.edit([(1..1, "2")], None, cx);
454 })
455 });
456 channel_buffer_b.update(cx_b, |buffer, cx| {
457 buffer.buffer().update(cx, |buffer, cx| {
458 buffer.edit([(0..0, "0")], None, cx);
459 })
460 });
461
462 // Both clients see their own edit.
463 deterministic.run_until_parked();
464 channel_buffer_a.read_with(cx_a, |buffer, cx| {
465 assert_eq!(buffer.buffer().read(cx).text(), "12");
466 });
467 channel_buffer_b.read_with(cx_b, |buffer, cx| {
468 assert_eq!(buffer.buffer().read(cx).text(), "01");
469 });
470
471 // Client A reconnects. Both clients see each other's edits, and see
472 // the same collaborators.
473 server.allow_connections();
474 deterministic.advance_clock(RECEIVE_TIMEOUT);
475 channel_buffer_a.read_with(cx_a, |buffer, cx| {
476 assert_eq!(buffer.buffer().read(cx).text(), "012");
477 });
478 channel_buffer_b.read_with(cx_b, |buffer, cx| {
479 assert_eq!(buffer.buffer().read(cx).text(), "012");
480 });
481
482 channel_buffer_a.read_with(cx_a, |buffer_a, _| {
483 channel_buffer_b.read_with(cx_b, |buffer_b, _| {
484 assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
485 });
486 });
487}
488
489#[gpui::test]
490async fn test_channel_buffers_and_server_restarts(
491 deterministic: Arc<Deterministic>,
492 cx_a: &mut TestAppContext,
493 cx_b: &mut TestAppContext,
494 cx_c: &mut TestAppContext,
495) {
496 deterministic.forbid_parking();
497 let mut server = TestServer::start(&deterministic).await;
498 let client_a = server.create_client(cx_a, "user_a").await;
499 let client_b = server.create_client(cx_b, "user_b").await;
500 let client_c = server.create_client(cx_c, "user_c").await;
501
502 let channel_id = server
503 .make_channel(
504 "the-channel",
505 None,
506 (&client_a, cx_a),
507 &mut [(&client_b, cx_b), (&client_c, cx_c)],
508 )
509 .await;
510
511 let channel_buffer_a = client_a
512 .channel_store()
513 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
514 .await
515 .unwrap();
516 let channel_buffer_b = client_b
517 .channel_store()
518 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
519 .await
520 .unwrap();
521 let _channel_buffer_c = client_c
522 .channel_store()
523 .update(cx_c, |store, cx| store.open_channel_buffer(channel_id, cx))
524 .await
525 .unwrap();
526
527 channel_buffer_a.update(cx_a, |buffer, cx| {
528 buffer.buffer().update(cx, |buffer, cx| {
529 buffer.edit([(0..0, "1")], None, cx);
530 })
531 });
532 deterministic.run_until_parked();
533
534 // Client C can't reconnect.
535 client_c.override_establish_connection(|_, cx| cx.spawn(|_| future::pending()));
536
537 // Server stops.
538 server.reset().await;
539 deterministic.advance_clock(RECEIVE_TIMEOUT);
540
541 // While the server is down, both clients make an edit.
542 channel_buffer_a.update(cx_a, |buffer, cx| {
543 buffer.buffer().update(cx, |buffer, cx| {
544 buffer.edit([(1..1, "2")], None, cx);
545 })
546 });
547 channel_buffer_b.update(cx_b, |buffer, cx| {
548 buffer.buffer().update(cx, |buffer, cx| {
549 buffer.edit([(0..0, "0")], None, cx);
550 })
551 });
552
553 // Server restarts.
554 server.start().await.unwrap();
555 deterministic.advance_clock(CLEANUP_TIMEOUT);
556
557 // Clients reconnects. Clients A and B see each other's edits, and see
558 // that client C has disconnected.
559 channel_buffer_a.read_with(cx_a, |buffer, cx| {
560 assert_eq!(buffer.buffer().read(cx).text(), "012");
561 });
562 channel_buffer_b.read_with(cx_b, |buffer, cx| {
563 assert_eq!(buffer.buffer().read(cx).text(), "012");
564 });
565
566 channel_buffer_a.read_with(cx_a, |buffer_a, _| {
567 channel_buffer_b.read_with(cx_b, |buffer_b, _| {
568 assert_eq!(
569 buffer_a
570 .collaborators()
571 .iter()
572 .map(|c| c.user_id)
573 .collect::<Vec<_>>(),
574 vec![client_a.user_id().unwrap(), client_b.user_id().unwrap()]
575 );
576 assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
577 });
578 });
579}
580
581#[track_caller]
582fn assert_collaborators(collaborators: &[proto::Collaborator], ids: &[Option<UserId>]) {
583 assert_eq!(
584 collaborators
585 .into_iter()
586 .map(|collaborator| collaborator.user_id)
587 .collect::<Vec<_>>(),
588 ids.into_iter().map(|id| id.unwrap()).collect::<Vec<_>>()
589 );
590}
591
592fn buffer_text(channel_buffer: &ModelHandle<language::Buffer>, cx: &mut TestAppContext) -> String {
593 channel_buffer.read_with(cx, |buffer, _| buffer.text())
594}