1use crate::{TestServer, test_server::open_channel_notes};
2use call::ActiveCall;
3use channel::ACKNOWLEDGE_DEBOUNCE_INTERVAL;
4use client::{Collaborator, ParticipantIndex, UserId};
5use collab::rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT};
6
7use collab_ui::channel_view::ChannelView;
8use collections::HashMap;
9use editor::{Anchor, Editor, MultiBufferOffset, ToOffset};
10use futures::future;
11use gpui::{BackgroundExecutor, Context, Entity, TestAppContext, Window};
12use rpc::{RECEIVE_TIMEOUT, proto::PeerId};
13use serde_json::json;
14use std::ops::Range;
15use util::rel_path::rel_path;
16use workspace::CollaboratorId;
17
18#[gpui::test]
19async fn test_core_channel_buffers(
20 executor: BackgroundExecutor,
21 cx_a: &mut TestAppContext,
22 cx_b: &mut TestAppContext,
23) {
24 let mut server = TestServer::start(executor.clone()).await;
25 let client_a = server.create_client(cx_a, "user_a").await;
26 let client_b = server.create_client(cx_b, "user_b").await;
27
28 let channel_id = server
29 .make_channel("zed", None, (&client_a, cx_a), &mut [(&client_b, cx_b)])
30 .await;
31
32 // Client A joins the channel buffer
33 let channel_buffer_a = client_a
34 .channel_store()
35 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
36 .await
37 .unwrap();
38
39 // Client A edits the buffer
40 let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
41 buffer_a.update(cx_a, |buffer, cx| {
42 buffer.edit([(0..0, "hello world")], None, cx)
43 });
44 buffer_a.update(cx_a, |buffer, cx| {
45 buffer.edit([(5..5, ", cruel")], None, cx)
46 });
47 buffer_a.update(cx_a, |buffer, cx| {
48 buffer.edit([(0..5, "goodbye")], None, cx)
49 });
50 buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
51 assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
52 executor.run_until_parked();
53
54 // Client B joins the channel buffer
55 let channel_buffer_b = client_b
56 .channel_store()
57 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
58 .await
59 .unwrap();
60 channel_buffer_b.read_with(cx_b, |buffer, _| {
61 assert_collaborators(
62 buffer.collaborators(),
63 &[client_a.user_id(), client_b.user_id()],
64 );
65 });
66
67 // Client B sees the correct text, and then edits it
68 let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
69 assert_eq!(
70 buffer_b.read_with(cx_b, |buffer, _| buffer.remote_id()),
71 buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id())
72 );
73 assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
74 buffer_b.update(cx_b, |buffer, cx| {
75 buffer.edit([(7..12, "beautiful")], None, cx)
76 });
77
78 // Both A and B see the new edit
79 executor.run_until_parked();
80 assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
81 assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
82
83 // Client A closes the channel buffer.
84 cx_a.update(|_| drop(channel_buffer_a));
85 executor.run_until_parked();
86
87 // Client B sees that client A is gone from the channel buffer.
88 channel_buffer_b.read_with(cx_b, |buffer, _| {
89 assert_collaborators(buffer.collaborators(), &[client_b.user_id()]);
90 });
91
92 // Client A rejoins the channel buffer
93 let _channel_buffer_a = client_a
94 .channel_store()
95 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
96 .await
97 .unwrap();
98 executor.run_until_parked();
99
100 // Sanity test, make sure we saw A rejoining
101 channel_buffer_b.read_with(cx_b, |buffer, _| {
102 assert_collaborators(
103 buffer.collaborators(),
104 &[client_a.user_id(), client_b.user_id()],
105 );
106 });
107
108 // Client A loses connection.
109 server.forbid_connections();
110 server.disconnect_client(client_a.peer_id().unwrap());
111 executor.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
112
113 // Client B observes A disconnect
114 channel_buffer_b.read_with(cx_b, |buffer, _| {
115 assert_collaborators(buffer.collaborators(), &[client_b.user_id()]);
116 });
117
118 // TODO:
119 // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
120 // - Test interaction with channel deletion while buffer is open
121}
122
123#[gpui::test]
124async fn test_channel_notes_participant_indices(
125 executor: BackgroundExecutor,
126 cx_a: &mut TestAppContext,
127 cx_b: &mut TestAppContext,
128 cx_c: &mut TestAppContext,
129) {
130 let mut server = TestServer::start(executor.clone()).await;
131 let client_a = server.create_client(cx_a, "user_a").await;
132 let client_b = server.create_client(cx_b, "user_b").await;
133 let client_c = server.create_client(cx_c, "user_c").await;
134
135 let active_call_a = cx_a.read(ActiveCall::global);
136 let active_call_b = cx_b.read(ActiveCall::global);
137
138 cx_a.update(editor::init);
139 cx_b.update(editor::init);
140 cx_c.update(editor::init);
141
142 let channel_id = server
143 .make_channel(
144 "the-channel",
145 None,
146 (&client_a, cx_a),
147 &mut [(&client_b, cx_b), (&client_c, cx_c)],
148 )
149 .await;
150
151 client_a
152 .fs()
153 .insert_tree("/root", json!({"file.txt": "123"}))
154 .await;
155 let (project_a, worktree_id_a) = client_a.build_local_project_with_trust("/root", cx_a).await;
156 let project_b = client_b.build_empty_local_project(false, cx_b);
157 let project_c = client_c.build_empty_local_project(false, cx_c);
158
159 let (workspace_a, mut cx_a) = client_a.build_workspace(&project_a, cx_a);
160 let (workspace_b, mut cx_b) = client_b.build_workspace(&project_b, cx_b);
161 let (workspace_c, cx_c) = client_c.build_workspace(&project_c, cx_c);
162
163 // Clients A, B, and C open the channel notes
164 let channel_view_a = cx_a
165 .update(|window, cx| ChannelView::open(channel_id, None, workspace_a.clone(), window, cx))
166 .await
167 .unwrap();
168 let channel_view_b = cx_b
169 .update(|window, cx| ChannelView::open(channel_id, None, workspace_b.clone(), window, cx))
170 .await
171 .unwrap();
172 let channel_view_c = cx_c
173 .update(|window, cx| ChannelView::open(channel_id, None, workspace_c.clone(), window, cx))
174 .await
175 .unwrap();
176
177 // Clients A, B, and C all insert and select some text
178 channel_view_a.update_in(cx_a, |notes, window, cx| {
179 notes.editor.update(cx, |editor, cx| {
180 editor.insert("a", window, cx);
181 editor.change_selections(Default::default(), window, cx, |selections| {
182 selections.select_ranges(vec![MultiBufferOffset(0)..MultiBufferOffset(1)]);
183 });
184 });
185 });
186 executor.run_until_parked();
187 channel_view_b.update_in(cx_b, |notes, window, cx| {
188 notes.editor.update(cx, |editor, cx| {
189 editor.move_down(&Default::default(), window, cx);
190 editor.insert("b", window, cx);
191 editor.change_selections(Default::default(), window, cx, |selections| {
192 selections.select_ranges(vec![MultiBufferOffset(1)..MultiBufferOffset(2)]);
193 });
194 });
195 });
196 executor.run_until_parked();
197 channel_view_c.update_in(cx_c, |notes, window, cx| {
198 notes.editor.update(cx, |editor, cx| {
199 editor.move_down(&Default::default(), window, cx);
200 editor.insert("c", window, cx);
201 editor.change_selections(Default::default(), window, cx, |selections| {
202 selections.select_ranges(vec![MultiBufferOffset(2)..MultiBufferOffset(3)]);
203 });
204 });
205 });
206
207 // Client A sees clients B and C without assigned colors, because they aren't
208 // in a call together.
209 executor.run_until_parked();
210 channel_view_a.update_in(cx_a, |notes, window, cx| {
211 notes.editor.update(cx, |editor, cx| {
212 assert_remote_selections(editor, &[(None, 1..2), (None, 2..3)], window, cx);
213 });
214 });
215
216 // Clients A and B join the same call.
217 for (call, cx) in [(&active_call_a, &mut cx_a), (&active_call_b, &mut cx_b)] {
218 call.update(*cx, |call, cx| call.join_channel(channel_id, cx))
219 .await
220 .unwrap();
221 }
222
223 // Clients A and B see each other with two different assigned colors. Client C
224 // still doesn't have a color.
225 executor.run_until_parked();
226 channel_view_a.update_in(cx_a, |notes, window, cx| {
227 notes.editor.update(cx, |editor, cx| {
228 assert_remote_selections(
229 editor,
230 &[(Some(ParticipantIndex(1)), 1..2), (None, 2..3)],
231 window,
232 cx,
233 );
234 });
235 });
236 channel_view_b.update_in(cx_b, |notes, window, cx| {
237 notes.editor.update(cx, |editor, cx| {
238 assert_remote_selections(
239 editor,
240 &[(Some(ParticipantIndex(0)), 0..1), (None, 2..3)],
241 window,
242 cx,
243 );
244 });
245 });
246
247 // Client A shares a project, and client B joins.
248 let project_id = active_call_a
249 .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
250 .await
251 .unwrap();
252 let project_b = client_b.join_remote_project(project_id, cx_b).await;
253 let (workspace_b, cx_b) = client_b.build_workspace(&project_b, cx_b);
254
255 // Clients A and B open the same file.
256 let editor_a = workspace_a
257 .update_in(cx_a, |workspace, window, cx| {
258 workspace.open_path(
259 (worktree_id_a, rel_path("file.txt")),
260 None,
261 true,
262 window,
263 cx,
264 )
265 })
266 .await
267 .unwrap()
268 .downcast::<Editor>()
269 .unwrap();
270 let editor_b = workspace_b
271 .update_in(cx_b, |workspace, window, cx| {
272 workspace.open_path(
273 (worktree_id_a, rel_path("file.txt")),
274 None,
275 true,
276 window,
277 cx,
278 )
279 })
280 .await
281 .unwrap()
282 .downcast::<Editor>()
283 .unwrap();
284
285 editor_a.update_in(cx_a, |editor, window, cx| {
286 editor.change_selections(Default::default(), window, cx, |selections| {
287 selections.select_ranges(vec![MultiBufferOffset(0)..MultiBufferOffset(1)]);
288 });
289 });
290 editor_b.update_in(cx_b, |editor, window, cx| {
291 editor.change_selections(Default::default(), window, cx, |selections| {
292 selections.select_ranges(vec![MultiBufferOffset(2)..MultiBufferOffset(3)]);
293 });
294 });
295 executor.run_until_parked();
296
297 // Clients A and B see each other with the same colors as in the channel notes.
298 editor_a.update_in(cx_a, |editor, window, cx| {
299 assert_remote_selections(editor, &[(Some(ParticipantIndex(1)), 2..3)], window, cx);
300 });
301 editor_b.update_in(cx_b, |editor, window, cx| {
302 assert_remote_selections(editor, &[(Some(ParticipantIndex(0)), 0..1)], window, cx);
303 });
304}
305
306#[track_caller]
307fn assert_remote_selections(
308 editor: &mut Editor,
309 expected_selections: &[(Option<ParticipantIndex>, Range<usize>)],
310 window: &mut Window,
311 cx: &mut Context<Editor>,
312) {
313 let snapshot = editor.snapshot(window, cx);
314 let hub = editor.collaboration_hub().unwrap();
315 let collaborators = hub.collaborators(cx);
316 let range = Anchor::min()..Anchor::max();
317 let remote_selections = snapshot
318 .remote_selections_in_range(&range, hub, cx)
319 .map(|s| {
320 let CollaboratorId::PeerId(peer_id) = s.collaborator_id else {
321 panic!("unexpected collaborator id");
322 };
323 let start = s.selection.start.to_offset(snapshot.buffer_snapshot());
324 let end = s.selection.end.to_offset(snapshot.buffer_snapshot());
325 let user_id = collaborators.get(&peer_id).unwrap().user_id;
326 let participant_index = hub.user_participant_indices(cx).get(&user_id).copied();
327 (participant_index, start.0..end.0)
328 })
329 .collect::<Vec<_>>();
330 assert_eq!(
331 remote_selections, expected_selections,
332 "incorrect remote selections"
333 );
334}
335
336#[gpui::test]
337async fn test_multiple_handles_to_channel_buffer(
338 deterministic: BackgroundExecutor,
339 cx_a: &mut TestAppContext,
340) {
341 let mut server = TestServer::start(deterministic.clone()).await;
342 let client_a = server.create_client(cx_a, "user_a").await;
343
344 let channel_id = server
345 .make_channel("the-channel", None, (&client_a, cx_a), &mut [])
346 .await;
347
348 let channel_buffer_1 = client_a
349 .channel_store()
350 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
351 let channel_buffer_2 = client_a
352 .channel_store()
353 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
354 let channel_buffer_3 = client_a
355 .channel_store()
356 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
357
358 // All concurrent tasks for opening a channel buffer return the same model handle.
359 let (channel_buffer, channel_buffer_2, channel_buffer_3) =
360 future::try_join3(channel_buffer_1, channel_buffer_2, channel_buffer_3)
361 .await
362 .unwrap();
363 let channel_buffer_entity_id = channel_buffer.entity_id();
364 assert_eq!(channel_buffer, channel_buffer_2);
365 assert_eq!(channel_buffer, channel_buffer_3);
366
367 channel_buffer.update(cx_a, |buffer, cx| {
368 buffer.buffer().update(cx, |buffer, cx| {
369 buffer.edit([(0..0, "hello")], None, cx);
370 })
371 });
372 deterministic.run_until_parked();
373
374 cx_a.update(|_| {
375 drop(channel_buffer);
376 drop(channel_buffer_2);
377 drop(channel_buffer_3);
378 });
379 deterministic.run_until_parked();
380
381 // The channel buffer can be reopened after dropping it.
382 let channel_buffer = client_a
383 .channel_store()
384 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
385 .await
386 .unwrap();
387 assert_ne!(channel_buffer.entity_id(), channel_buffer_entity_id);
388 channel_buffer.update(cx_a, |buffer, cx| {
389 buffer.buffer().update(cx, |buffer, _| {
390 assert_eq!(buffer.text(), "hello");
391 })
392 });
393}
394
395#[gpui::test]
396async fn test_channel_buffer_disconnect(
397 deterministic: BackgroundExecutor,
398 cx_a: &mut TestAppContext,
399 cx_b: &mut TestAppContext,
400) {
401 let mut server = TestServer::start(deterministic.clone()).await;
402 let client_a = server.create_client(cx_a, "user_a").await;
403 let client_b = server.create_client(cx_b, "user_b").await;
404
405 let channel_id = server
406 .make_channel(
407 "the-channel",
408 None,
409 (&client_a, cx_a),
410 &mut [(&client_b, cx_b)],
411 )
412 .await;
413
414 let channel_buffer_a = client_a
415 .channel_store()
416 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
417 .await
418 .unwrap();
419
420 let channel_buffer_b = client_b
421 .channel_store()
422 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
423 .await
424 .unwrap();
425
426 server.forbid_connections();
427 server.disconnect_client(client_a.peer_id().unwrap());
428 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
429
430 channel_buffer_a.update(cx_a, |buffer, cx| {
431 assert_eq!(buffer.channel(cx).unwrap().name, "the-channel");
432 assert!(!buffer.is_connected());
433 });
434
435 deterministic.run_until_parked();
436
437 server.allow_connections();
438 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
439
440 deterministic.run_until_parked();
441
442 client_a
443 .channel_store()
444 .update(cx_a, |channel_store, _| {
445 channel_store.remove_channel(channel_id)
446 })
447 .await
448 .unwrap();
449 deterministic.run_until_parked();
450
451 // Channel buffer observed the deletion
452 channel_buffer_b.update(cx_b, |buffer, cx| {
453 assert!(buffer.channel(cx).is_none());
454 assert!(!buffer.is_connected());
455 });
456}
457
458#[gpui::test]
459async fn test_rejoin_channel_buffer(
460 deterministic: BackgroundExecutor,
461 cx_a: &mut TestAppContext,
462 cx_b: &mut TestAppContext,
463) {
464 let mut server = TestServer::start(deterministic.clone()).await;
465 let client_a = server.create_client(cx_a, "user_a").await;
466 let client_b = server.create_client(cx_b, "user_b").await;
467
468 let channel_id = server
469 .make_channel(
470 "the-channel",
471 None,
472 (&client_a, cx_a),
473 &mut [(&client_b, cx_b)],
474 )
475 .await;
476
477 let channel_buffer_a = client_a
478 .channel_store()
479 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
480 .await
481 .unwrap();
482 let channel_buffer_b = client_b
483 .channel_store()
484 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
485 .await
486 .unwrap();
487
488 channel_buffer_a.update(cx_a, |buffer, cx| {
489 buffer.buffer().update(cx, |buffer, cx| {
490 buffer.edit([(0..0, "1")], None, cx);
491 })
492 });
493 deterministic.run_until_parked();
494
495 // Client A disconnects.
496 server.forbid_connections();
497 server.disconnect_client(client_a.peer_id().unwrap());
498
499 // Both clients make an edit.
500 channel_buffer_a.update(cx_a, |buffer, cx| {
501 buffer.buffer().update(cx, |buffer, cx| {
502 buffer.edit([(1..1, "2")], None, cx);
503 })
504 });
505 channel_buffer_b.update(cx_b, |buffer, cx| {
506 buffer.buffer().update(cx, |buffer, cx| {
507 buffer.edit([(0..0, "0")], None, cx);
508 })
509 });
510
511 // Both clients see their own edit.
512 deterministic.run_until_parked();
513 channel_buffer_a.read_with(cx_a, |buffer, cx| {
514 assert_eq!(buffer.buffer().read(cx).text(), "12");
515 });
516 channel_buffer_b.read_with(cx_b, |buffer, cx| {
517 assert_eq!(buffer.buffer().read(cx).text(), "01");
518 });
519
520 // Client A reconnects. Both clients see each other's edits, and see
521 // the same collaborators.
522 server.allow_connections();
523 deterministic.advance_clock(RECEIVE_TIMEOUT);
524 channel_buffer_a.read_with(cx_a, |buffer, cx| {
525 assert_eq!(buffer.buffer().read(cx).text(), "012");
526 });
527 channel_buffer_b.read_with(cx_b, |buffer, cx| {
528 assert_eq!(buffer.buffer().read(cx).text(), "012");
529 });
530
531 channel_buffer_a.read_with(cx_a, |buffer_a, _| {
532 channel_buffer_b.read_with(cx_b, |buffer_b, _| {
533 assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
534 });
535 });
536}
537
538#[gpui::test]
539async fn test_channel_buffers_and_server_restarts(
540 deterministic: BackgroundExecutor,
541 cx_a: &mut TestAppContext,
542 cx_b: &mut TestAppContext,
543 cx_c: &mut TestAppContext,
544) {
545 let mut server = TestServer::start(deterministic.clone()).await;
546 let client_a = server.create_client(cx_a, "user_a").await;
547 let client_b = server.create_client(cx_b, "user_b").await;
548 let client_c = server.create_client(cx_c, "user_c").await;
549
550 let channel_id = server
551 .make_channel(
552 "the-channel",
553 None,
554 (&client_a, cx_a),
555 &mut [(&client_b, cx_b), (&client_c, cx_c)],
556 )
557 .await;
558
559 let channel_buffer_a = client_a
560 .channel_store()
561 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
562 .await
563 .unwrap();
564 let channel_buffer_b = client_b
565 .channel_store()
566 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
567 .await
568 .unwrap();
569 let _channel_buffer_c = client_c
570 .channel_store()
571 .update(cx_c, |store, cx| store.open_channel_buffer(channel_id, cx))
572 .await
573 .unwrap();
574
575 channel_buffer_a.update(cx_a, |buffer, cx| {
576 buffer.buffer().update(cx, |buffer, cx| {
577 buffer.edit([(0..0, "1")], None, cx);
578 })
579 });
580 deterministic.run_until_parked();
581
582 // Client C can't reconnect.
583 client_c.override_establish_connection(|_, cx| cx.spawn(async |_| future::pending().await));
584
585 // Server stops.
586 server.reset().await;
587 deterministic.advance_clock(RECEIVE_TIMEOUT);
588
589 // While the server is down, both clients make an edit.
590 channel_buffer_a.update(cx_a, |buffer, cx| {
591 buffer.buffer().update(cx, |buffer, cx| {
592 buffer.edit([(1..1, "2")], None, cx);
593 })
594 });
595 channel_buffer_b.update(cx_b, |buffer, cx| {
596 buffer.buffer().update(cx, |buffer, cx| {
597 buffer.edit([(0..0, "0")], None, cx);
598 })
599 });
600
601 // Server restarts.
602 server.start().await.unwrap();
603 deterministic.advance_clock(CLEANUP_TIMEOUT);
604
605 // Clients reconnects. Clients A and B see each other's edits, and see
606 // that client C has disconnected.
607 channel_buffer_a.read_with(cx_a, |buffer, cx| {
608 assert_eq!(buffer.buffer().read(cx).text(), "012");
609 });
610 channel_buffer_b.read_with(cx_b, |buffer, cx| {
611 assert_eq!(buffer.buffer().read(cx).text(), "012");
612 });
613
614 channel_buffer_a.read_with(cx_a, |buffer_a, _| {
615 channel_buffer_b.read_with(cx_b, |buffer_b, _| {
616 assert_collaborators(
617 buffer_a.collaborators(),
618 &[client_a.user_id(), client_b.user_id()],
619 );
620 assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
621 });
622 });
623}
624
625#[gpui::test]
626async fn test_channel_buffer_changes(
627 deterministic: BackgroundExecutor,
628 cx_a: &mut TestAppContext,
629 cx_b: &mut TestAppContext,
630) {
631 let (server, client_a, client_b, channel_id) = TestServer::start2(cx_a, cx_b).await;
632 let (_, cx_a) = client_a.build_test_workspace(cx_a).await;
633 let (workspace_b, cx_b) = client_b.build_test_workspace(cx_b).await;
634 let channel_store_b = client_b.channel_store().clone();
635
636 // Editing the channel notes should set them to dirty
637 open_channel_notes(channel_id, cx_a).await.unwrap();
638 cx_a.simulate_keystrokes("1");
639 channel_store_b.read_with(cx_b, |channel_store, _| {
640 assert!(channel_store.has_channel_buffer_changed(channel_id))
641 });
642
643 // Opening the buffer should clear the changed flag.
644 open_channel_notes(channel_id, cx_b).await.unwrap();
645 channel_store_b.read_with(cx_b, |channel_store, _| {
646 assert!(!channel_store.has_channel_buffer_changed(channel_id))
647 });
648
649 // Editing the channel while the buffer is open should not show that the buffer has changed.
650 cx_a.simulate_keystrokes("2");
651 channel_store_b.read_with(cx_b, |channel_store, _| {
652 assert!(!channel_store.has_channel_buffer_changed(channel_id))
653 });
654
655 // Test that the server is tracking things correctly, and we retain our 'not changed'
656 // state across a disconnect
657 deterministic.advance_clock(ACKNOWLEDGE_DEBOUNCE_INTERVAL);
658 server
659 .simulate_long_connection_interruption(client_b.peer_id().unwrap(), deterministic.clone());
660 channel_store_b.read_with(cx_b, |channel_store, _| {
661 assert!(!channel_store.has_channel_buffer_changed(channel_id))
662 });
663
664 // Closing the buffer should re-enable change tracking
665 cx_b.update(|window, cx| {
666 workspace_b.update(cx, |workspace, cx| {
667 workspace.close_all_items_and_panes(&Default::default(), window, cx)
668 });
669 });
670 deterministic.run_until_parked();
671
672 cx_a.simulate_keystrokes("3");
673 channel_store_b.read_with(cx_b, |channel_store, _| {
674 assert!(channel_store.has_channel_buffer_changed(channel_id))
675 });
676}
677
678#[gpui::test]
679async fn test_channel_buffer_changes_persist(
680 cx_a: &mut TestAppContext,
681 cx_b: &mut TestAppContext,
682 cx_b2: &mut TestAppContext,
683) {
684 let (mut server, client_a, client_b, channel_id) = TestServer::start2(cx_a, cx_b).await;
685 let (_, cx_a) = client_a.build_test_workspace(cx_a).await;
686 let (_, cx_b) = client_b.build_test_workspace(cx_b).await;
687
688 // a) edits the notes
689 open_channel_notes(channel_id, cx_a).await.unwrap();
690 cx_a.simulate_keystrokes("1");
691 // b) opens them to observe the current version
692 open_channel_notes(channel_id, cx_b).await.unwrap();
693
694 // On boot the client should get the correct state.
695 let client_b2 = server.create_client(cx_b2, "user_b").await;
696 let channel_store_b2 = client_b2.channel_store().clone();
697 channel_store_b2.read_with(cx_b2, |channel_store, _| {
698 assert!(!channel_store.has_channel_buffer_changed(channel_id))
699 });
700}
701
702#[gpui::test]
703async fn test_channel_buffer_operations_lost_on_reconnect(
704 executor: BackgroundExecutor,
705 cx_a: &mut TestAppContext,
706 cx_b: &mut TestAppContext,
707) {
708 let mut server = TestServer::start(executor.clone()).await;
709 let client_a = server.create_client(cx_a, "user_a").await;
710 let client_b = server.create_client(cx_b, "user_b").await;
711
712 let channel_id = server
713 .make_channel(
714 "the-channel",
715 None,
716 (&client_a, cx_a),
717 &mut [(&client_b, cx_b)],
718 )
719 .await;
720
721 // Both clients open the channel buffer.
722 let channel_buffer_a = client_a
723 .channel_store()
724 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
725 .await
726 .unwrap();
727 let channel_buffer_b = client_b
728 .channel_store()
729 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
730 .await
731 .unwrap();
732
733 // Step 1: Client A makes an initial edit that syncs to B.
734 channel_buffer_a.update(cx_a, |buffer, cx| {
735 buffer.buffer().update(cx, |buffer, cx| {
736 buffer.edit([(0..0, "a")], None, cx);
737 })
738 });
739 executor.run_until_parked();
740
741 // Verify both clients see "a".
742 channel_buffer_a.read_with(cx_a, |buffer, cx| {
743 assert_eq!(buffer.buffer().read(cx).text(), "a");
744 });
745 channel_buffer_b.read_with(cx_b, |buffer, cx| {
746 assert_eq!(buffer.buffer().read(cx).text(), "a");
747 });
748
749 // Step 2: Disconnect client A. Do NOT advance past RECONNECT_TIMEOUT
750 // so that the buffer stays in `opened_buffers` for rejoin.
751 server.forbid_connections();
752 server.disconnect_client(client_a.peer_id().unwrap());
753 executor.run_until_parked();
754
755 // Step 3: While disconnected, client A makes an offline edit ("b").
756 // on_buffer_update fires but client.send() fails because transport is down.
757 channel_buffer_a.update(cx_a, |buffer, cx| {
758 buffer.buffer().update(cx, |buffer, cx| {
759 buffer.edit([(1..1, "b")], None, cx);
760 })
761 });
762 executor.run_until_parked();
763
764 // Client A sees "ab" locally; B still sees "a".
765 channel_buffer_a.read_with(cx_a, |buffer, cx| {
766 assert_eq!(buffer.buffer().read(cx).text(), "ab");
767 });
768 channel_buffer_b.read_with(cx_b, |buffer, cx| {
769 assert_eq!(buffer.buffer().read(cx).text(), "a");
770 });
771
772 // Step 4: Reconnect and make a racing edit in parallel.
773 //
774 // The race condition occurs when:
775 // 1. Transport reconnects, handle_connect captures version V (with "b") and sends RejoinChannelBuffers
776 // 2. DURING the async gap (awaiting response), user makes edit "c"
777 // 3. on_buffer_update sends UpdateChannelBuffer (succeeds because transport is up)
778 // 4. Server receives BOTH messages concurrently (FuturesUnordered)
779 // 5. If UpdateChannelBuffer commits first, server version is inflated to include "c"
780 // 6. RejoinChannelBuffers reads inflated version and sends it back
781 // 7. Client's serialize_ops(inflated_version) filters out "b" (offline edit)
782 // because the inflated version's timestamp covers "b"'s timestamp
783
784 // Get the buffer handle for spawning
785 let buffer_for_edit = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
786
787 // Spawn the edit task - it will wait for executor to run it
788 let edit_task = cx_a.spawn({
789 let buffer = buffer_for_edit;
790 async move |mut cx| {
791 let _ = buffer.update(&mut cx, |buffer, cx| {
792 buffer.edit([(2..2, "c")], None, cx);
793 });
794 }
795 });
796
797 // Allow connections so reconnect can succeed
798 server.allow_connections();
799
800 // Advance clock to trigger reconnection attempt
801 executor.advance_clock(RECEIVE_TIMEOUT);
802
803 // Run the edit task - this races with handle_connect
804 edit_task.detach();
805
806 // Let everything settle.
807 executor.run_until_parked();
808
809 // Step 7: Read final buffer text from both clients.
810 let text_a = channel_buffer_a.read_with(cx_a, |buffer, cx| buffer.buffer().read(cx).text());
811 let text_b = channel_buffer_b.read_with(cx_b, |buffer, cx| buffer.buffer().read(cx).text());
812
813 // Both clients must see the same text containing all three edits.
814 assert_eq!(
815 text_a, text_b,
816 "Client A and B diverged! A sees {:?}, B sees {:?}. \
817 Operations were lost during reconnection.",
818 text_a, text_b
819 );
820 assert!(
821 text_a.contains('a'),
822 "Initial edit 'a' missing from final text {:?}",
823 text_a
824 );
825 assert!(
826 text_a.contains('b'),
827 "Offline edit 'b' missing from final text {:?}. \
828 This is the reconnection race bug: the offline operation was \
829 filtered out by serialize_ops because the server_version was \
830 inflated by a racing UpdateChannelBuffer.",
831 text_a
832 );
833 assert!(
834 text_a.contains('c'),
835 "Racing edit 'c' missing from final text {:?}",
836 text_a
837 );
838
839 // Step 8: Verify the invariant directly — every operation known to
840 // client A must be observed by client B's version. If any operation
841 // in A's history is not covered by B's version, it was lost.
842 channel_buffer_a.read_with(cx_a, |buf_a, cx_a_inner| {
843 let buffer_a = buf_a.buffer().read(cx_a_inner);
844 let ops_a = buffer_a.operations();
845 channel_buffer_b.read_with(cx_b, |buf_b, cx_b_inner| {
846 let buffer_b = buf_b.buffer().read(cx_b_inner);
847 let version_b = buffer_b.version();
848 for (lamport, _op) in ops_a.iter() {
849 assert!(
850 version_b.observed(*lamport),
851 "Operation with lamport timestamp {:?} from client A \
852 is NOT observed by client B's version. This operation \
853 was lost during reconnection.",
854 lamport
855 );
856 }
857 });
858 });
859}
860
861#[track_caller]
862fn assert_collaborators(collaborators: &HashMap<PeerId, Collaborator>, ids: &[Option<UserId>]) {
863 let mut user_ids = collaborators
864 .values()
865 .map(|collaborator| collaborator.user_id)
866 .collect::<Vec<_>>();
867 user_ids.sort();
868 assert_eq!(
869 user_ids,
870 ids.iter().map(|id| id.unwrap()).collect::<Vec<_>>()
871 );
872}
873
874fn buffer_text(channel_buffer: &Entity<language::Buffer>, cx: &mut TestAppContext) -> String {
875 channel_buffer.read_with(cx, |buffer, _| buffer.text())
876}