channel_buffer_tests.rs

  1use crate::{TestServer, test_server::open_channel_notes};
  2use call::ActiveCall;
  3use channel::ACKNOWLEDGE_DEBOUNCE_INTERVAL;
  4use client::{Collaborator, ParticipantIndex, UserId};
  5use collab::rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT};
  6
  7use collab_ui::channel_view::ChannelView;
  8use collections::HashMap;
  9use editor::{Anchor, Editor, MultiBufferOffset, ToOffset};
 10use futures::future;
 11use gpui::{BackgroundExecutor, Context, Entity, TestAppContext, Window};
 12use rpc::{RECEIVE_TIMEOUT, proto::PeerId};
 13use serde_json::json;
 14use std::ops::Range;
 15use util::rel_path::rel_path;
 16use workspace::CollaboratorId;
 17
 18#[gpui::test]
 19async fn test_core_channel_buffers(
 20    executor: BackgroundExecutor,
 21    cx_a: &mut TestAppContext,
 22    cx_b: &mut TestAppContext,
 23) {
 24    let mut server = TestServer::start(executor.clone()).await;
 25    let client_a = server.create_client(cx_a, "user_a").await;
 26    let client_b = server.create_client(cx_b, "user_b").await;
 27
 28    let channel_id = server
 29        .make_channel("zed", None, (&client_a, cx_a), &mut [(&client_b, cx_b)])
 30        .await;
 31
 32    // Client A joins the channel buffer
 33    let channel_buffer_a = client_a
 34        .channel_store()
 35        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
 36        .await
 37        .unwrap();
 38
 39    // Client A edits the buffer
 40    let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
 41    buffer_a.update(cx_a, |buffer, cx| {
 42        buffer.edit([(0..0, "hello world")], None, cx)
 43    });
 44    buffer_a.update(cx_a, |buffer, cx| {
 45        buffer.edit([(5..5, ", cruel")], None, cx)
 46    });
 47    buffer_a.update(cx_a, |buffer, cx| {
 48        buffer.edit([(0..5, "goodbye")], None, cx)
 49    });
 50    buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
 51    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
 52    executor.run_until_parked();
 53
 54    // Client B joins the channel buffer
 55    let channel_buffer_b = client_b
 56        .channel_store()
 57        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
 58        .await
 59        .unwrap();
 60    channel_buffer_b.read_with(cx_b, |buffer, _| {
 61        assert_collaborators(
 62            buffer.collaborators(),
 63            &[client_a.user_id(), client_b.user_id()],
 64        );
 65    });
 66
 67    // Client B sees the correct text, and then edits it
 68    let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
 69    assert_eq!(
 70        buffer_b.read_with(cx_b, |buffer, _| buffer.remote_id()),
 71        buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id())
 72    );
 73    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
 74    buffer_b.update(cx_b, |buffer, cx| {
 75        buffer.edit([(7..12, "beautiful")], None, cx)
 76    });
 77
 78    // Both A and B see the new edit
 79    executor.run_until_parked();
 80    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
 81    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
 82
 83    // Client A closes the channel buffer.
 84    cx_a.update(|_| drop(channel_buffer_a));
 85    executor.run_until_parked();
 86
 87    // Client B sees that client A is gone from the channel buffer.
 88    channel_buffer_b.read_with(cx_b, |buffer, _| {
 89        assert_collaborators(buffer.collaborators(), &[client_b.user_id()]);
 90    });
 91
 92    // Client A rejoins the channel buffer
 93    let _channel_buffer_a = client_a
 94        .channel_store()
 95        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
 96        .await
 97        .unwrap();
 98    executor.run_until_parked();
 99
100    // Sanity test, make sure we saw A rejoining
101    channel_buffer_b.read_with(cx_b, |buffer, _| {
102        assert_collaborators(
103            buffer.collaborators(),
104            &[client_a.user_id(), client_b.user_id()],
105        );
106    });
107
108    // Client A loses connection.
109    server.forbid_connections();
110    server.disconnect_client(client_a.peer_id().unwrap());
111    executor.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
112
113    // Client B observes A disconnect
114    channel_buffer_b.read_with(cx_b, |buffer, _| {
115        assert_collaborators(buffer.collaborators(), &[client_b.user_id()]);
116    });
117
118    // TODO:
119    // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
120    // - Test interaction with channel deletion while buffer is open
121}
122
123#[gpui::test]
124async fn test_channel_notes_participant_indices(
125    executor: BackgroundExecutor,
126    cx_a: &mut TestAppContext,
127    cx_b: &mut TestAppContext,
128    cx_c: &mut TestAppContext,
129) {
130    let mut server = TestServer::start(executor.clone()).await;
131    let client_a = server.create_client(cx_a, "user_a").await;
132    let client_b = server.create_client(cx_b, "user_b").await;
133    let client_c = server.create_client(cx_c, "user_c").await;
134
135    let active_call_a = cx_a.read(ActiveCall::global);
136    let active_call_b = cx_b.read(ActiveCall::global);
137
138    cx_a.update(editor::init);
139    cx_b.update(editor::init);
140    cx_c.update(editor::init);
141
142    let channel_id = server
143        .make_channel(
144            "the-channel",
145            None,
146            (&client_a, cx_a),
147            &mut [(&client_b, cx_b), (&client_c, cx_c)],
148        )
149        .await;
150
151    client_a
152        .fs()
153        .insert_tree("/root", json!({"file.txt": "123"}))
154        .await;
155    let (project_a, worktree_id_a) = client_a.build_local_project_with_trust("/root", cx_a).await;
156    let project_b = client_b.build_empty_local_project(false, cx_b);
157    let project_c = client_c.build_empty_local_project(false, cx_c);
158
159    let (workspace_a, mut cx_a) = client_a.build_workspace(&project_a, cx_a);
160    let (workspace_b, mut cx_b) = client_b.build_workspace(&project_b, cx_b);
161    let (workspace_c, cx_c) = client_c.build_workspace(&project_c, cx_c);
162
163    // Clients A, B, and C open the channel notes
164    let channel_view_a = cx_a
165        .update(|window, cx| ChannelView::open(channel_id, None, workspace_a.clone(), window, cx))
166        .await
167        .unwrap();
168    let channel_view_b = cx_b
169        .update(|window, cx| ChannelView::open(channel_id, None, workspace_b.clone(), window, cx))
170        .await
171        .unwrap();
172    let channel_view_c = cx_c
173        .update(|window, cx| ChannelView::open(channel_id, None, workspace_c.clone(), window, cx))
174        .await
175        .unwrap();
176
177    // Clients A, B, and C all insert and select some text
178    channel_view_a.update_in(cx_a, |notes, window, cx| {
179        notes.editor.update(cx, |editor, cx| {
180            editor.insert("a", window, cx);
181            editor.change_selections(Default::default(), window, cx, |selections| {
182                selections.select_ranges(vec![MultiBufferOffset(0)..MultiBufferOffset(1)]);
183            });
184        });
185    });
186    executor.run_until_parked();
187    channel_view_b.update_in(cx_b, |notes, window, cx| {
188        notes.editor.update(cx, |editor, cx| {
189            editor.move_down(&Default::default(), window, cx);
190            editor.insert("b", window, cx);
191            editor.change_selections(Default::default(), window, cx, |selections| {
192                selections.select_ranges(vec![MultiBufferOffset(1)..MultiBufferOffset(2)]);
193            });
194        });
195    });
196    executor.run_until_parked();
197    channel_view_c.update_in(cx_c, |notes, window, cx| {
198        notes.editor.update(cx, |editor, cx| {
199            editor.move_down(&Default::default(), window, cx);
200            editor.insert("c", window, cx);
201            editor.change_selections(Default::default(), window, cx, |selections| {
202                selections.select_ranges(vec![MultiBufferOffset(2)..MultiBufferOffset(3)]);
203            });
204        });
205    });
206
207    // Client A sees clients B and C without assigned colors, because they aren't
208    // in a call together.
209    executor.run_until_parked();
210    channel_view_a.update_in(cx_a, |notes, window, cx| {
211        notes.editor.update(cx, |editor, cx| {
212            assert_remote_selections(editor, &[(None, 1..2), (None, 2..3)], window, cx);
213        });
214    });
215
216    // Clients A and B join the same call.
217    for (call, cx) in [(&active_call_a, &mut cx_a), (&active_call_b, &mut cx_b)] {
218        call.update(*cx, |call, cx| call.join_channel(channel_id, cx))
219            .await
220            .unwrap();
221    }
222
223    // Clients A and B see each other with two different assigned colors. Client C
224    // still doesn't have a color.
225    executor.run_until_parked();
226    channel_view_a.update_in(cx_a, |notes, window, cx| {
227        notes.editor.update(cx, |editor, cx| {
228            assert_remote_selections(
229                editor,
230                &[(Some(ParticipantIndex(1)), 1..2), (None, 2..3)],
231                window,
232                cx,
233            );
234        });
235    });
236    channel_view_b.update_in(cx_b, |notes, window, cx| {
237        notes.editor.update(cx, |editor, cx| {
238            assert_remote_selections(
239                editor,
240                &[(Some(ParticipantIndex(0)), 0..1), (None, 2..3)],
241                window,
242                cx,
243            );
244        });
245    });
246
247    // Client A shares a project, and client B joins.
248    let project_id = active_call_a
249        .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
250        .await
251        .unwrap();
252    let project_b = client_b.join_remote_project(project_id, cx_b).await;
253    let (workspace_b, cx_b) = client_b.build_workspace(&project_b, cx_b);
254
255    // Clients A and B open the same file.
256    let editor_a = workspace_a
257        .update_in(cx_a, |workspace, window, cx| {
258            workspace.open_path(
259                (worktree_id_a, rel_path("file.txt")),
260                None,
261                true,
262                window,
263                cx,
264            )
265        })
266        .await
267        .unwrap()
268        .downcast::<Editor>()
269        .unwrap();
270    let editor_b = workspace_b
271        .update_in(cx_b, |workspace, window, cx| {
272            workspace.open_path(
273                (worktree_id_a, rel_path("file.txt")),
274                None,
275                true,
276                window,
277                cx,
278            )
279        })
280        .await
281        .unwrap()
282        .downcast::<Editor>()
283        .unwrap();
284
285    editor_a.update_in(cx_a, |editor, window, cx| {
286        editor.change_selections(Default::default(), window, cx, |selections| {
287            selections.select_ranges(vec![MultiBufferOffset(0)..MultiBufferOffset(1)]);
288        });
289    });
290    editor_b.update_in(cx_b, |editor, window, cx| {
291        editor.change_selections(Default::default(), window, cx, |selections| {
292            selections.select_ranges(vec![MultiBufferOffset(2)..MultiBufferOffset(3)]);
293        });
294    });
295    executor.run_until_parked();
296
297    // Clients A and B see each other with the same colors as in the channel notes.
298    editor_a.update_in(cx_a, |editor, window, cx| {
299        assert_remote_selections(editor, &[(Some(ParticipantIndex(1)), 2..3)], window, cx);
300    });
301    editor_b.update_in(cx_b, |editor, window, cx| {
302        assert_remote_selections(editor, &[(Some(ParticipantIndex(0)), 0..1)], window, cx);
303    });
304}
305
306#[track_caller]
307fn assert_remote_selections(
308    editor: &mut Editor,
309    expected_selections: &[(Option<ParticipantIndex>, Range<usize>)],
310    window: &mut Window,
311    cx: &mut Context<Editor>,
312) {
313    let snapshot = editor.snapshot(window, cx);
314    let hub = editor.collaboration_hub().unwrap();
315    let collaborators = hub.collaborators(cx);
316    let range = Anchor::min()..Anchor::max();
317    let remote_selections = snapshot
318        .remote_selections_in_range(&range, hub, cx)
319        .map(|s| {
320            let CollaboratorId::PeerId(peer_id) = s.collaborator_id else {
321                panic!("unexpected collaborator id");
322            };
323            let start = s.selection.start.to_offset(snapshot.buffer_snapshot());
324            let end = s.selection.end.to_offset(snapshot.buffer_snapshot());
325            let user_id = collaborators.get(&peer_id).unwrap().user_id;
326            let participant_index = hub.user_participant_indices(cx).get(&user_id).copied();
327            (participant_index, start.0..end.0)
328        })
329        .collect::<Vec<_>>();
330    assert_eq!(
331        remote_selections, expected_selections,
332        "incorrect remote selections"
333    );
334}
335
336#[gpui::test]
337async fn test_multiple_handles_to_channel_buffer(
338    deterministic: BackgroundExecutor,
339    cx_a: &mut TestAppContext,
340) {
341    let mut server = TestServer::start(deterministic.clone()).await;
342    let client_a = server.create_client(cx_a, "user_a").await;
343
344    let channel_id = server
345        .make_channel("the-channel", None, (&client_a, cx_a), &mut [])
346        .await;
347
348    let channel_buffer_1 = client_a
349        .channel_store()
350        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
351    let channel_buffer_2 = client_a
352        .channel_store()
353        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
354    let channel_buffer_3 = client_a
355        .channel_store()
356        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
357
358    // All concurrent tasks for opening a channel buffer return the same model handle.
359    let (channel_buffer, channel_buffer_2, channel_buffer_3) =
360        future::try_join3(channel_buffer_1, channel_buffer_2, channel_buffer_3)
361            .await
362            .unwrap();
363    let channel_buffer_entity_id = channel_buffer.entity_id();
364    assert_eq!(channel_buffer, channel_buffer_2);
365    assert_eq!(channel_buffer, channel_buffer_3);
366
367    channel_buffer.update(cx_a, |buffer, cx| {
368        buffer.buffer().update(cx, |buffer, cx| {
369            buffer.edit([(0..0, "hello")], None, cx);
370        })
371    });
372    deterministic.run_until_parked();
373
374    cx_a.update(|_| {
375        drop(channel_buffer);
376        drop(channel_buffer_2);
377        drop(channel_buffer_3);
378    });
379    deterministic.run_until_parked();
380
381    // The channel buffer can be reopened after dropping it.
382    let channel_buffer = client_a
383        .channel_store()
384        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
385        .await
386        .unwrap();
387    assert_ne!(channel_buffer.entity_id(), channel_buffer_entity_id);
388    channel_buffer.update(cx_a, |buffer, cx| {
389        buffer.buffer().update(cx, |buffer, _| {
390            assert_eq!(buffer.text(), "hello");
391        })
392    });
393}
394
395#[gpui::test]
396async fn test_channel_buffer_disconnect(
397    deterministic: BackgroundExecutor,
398    cx_a: &mut TestAppContext,
399    cx_b: &mut TestAppContext,
400) {
401    let mut server = TestServer::start(deterministic.clone()).await;
402    let client_a = server.create_client(cx_a, "user_a").await;
403    let client_b = server.create_client(cx_b, "user_b").await;
404
405    let channel_id = server
406        .make_channel(
407            "the-channel",
408            None,
409            (&client_a, cx_a),
410            &mut [(&client_b, cx_b)],
411        )
412        .await;
413
414    let channel_buffer_a = client_a
415        .channel_store()
416        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
417        .await
418        .unwrap();
419
420    let channel_buffer_b = client_b
421        .channel_store()
422        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
423        .await
424        .unwrap();
425
426    server.forbid_connections();
427    server.disconnect_client(client_a.peer_id().unwrap());
428    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
429
430    channel_buffer_a.update(cx_a, |buffer, cx| {
431        assert_eq!(buffer.channel(cx).unwrap().name, "the-channel");
432        assert!(!buffer.is_connected());
433    });
434
435    deterministic.run_until_parked();
436
437    server.allow_connections();
438    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
439
440    deterministic.run_until_parked();
441
442    client_a
443        .channel_store()
444        .update(cx_a, |channel_store, _| {
445            channel_store.remove_channel(channel_id)
446        })
447        .await
448        .unwrap();
449    deterministic.run_until_parked();
450
451    // Channel buffer observed the deletion
452    channel_buffer_b.update(cx_b, |buffer, cx| {
453        assert!(buffer.channel(cx).is_none());
454        assert!(!buffer.is_connected());
455    });
456}
457
458#[gpui::test]
459async fn test_rejoin_channel_buffer(
460    deterministic: BackgroundExecutor,
461    cx_a: &mut TestAppContext,
462    cx_b: &mut TestAppContext,
463) {
464    let mut server = TestServer::start(deterministic.clone()).await;
465    let client_a = server.create_client(cx_a, "user_a").await;
466    let client_b = server.create_client(cx_b, "user_b").await;
467
468    let channel_id = server
469        .make_channel(
470            "the-channel",
471            None,
472            (&client_a, cx_a),
473            &mut [(&client_b, cx_b)],
474        )
475        .await;
476
477    let channel_buffer_a = client_a
478        .channel_store()
479        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
480        .await
481        .unwrap();
482    let channel_buffer_b = client_b
483        .channel_store()
484        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
485        .await
486        .unwrap();
487
488    channel_buffer_a.update(cx_a, |buffer, cx| {
489        buffer.buffer().update(cx, |buffer, cx| {
490            buffer.edit([(0..0, "1")], None, cx);
491        })
492    });
493    deterministic.run_until_parked();
494
495    // Client A disconnects.
496    server.forbid_connections();
497    server.disconnect_client(client_a.peer_id().unwrap());
498
499    // Both clients make an edit.
500    channel_buffer_a.update(cx_a, |buffer, cx| {
501        buffer.buffer().update(cx, |buffer, cx| {
502            buffer.edit([(1..1, "2")], None, cx);
503        })
504    });
505    channel_buffer_b.update(cx_b, |buffer, cx| {
506        buffer.buffer().update(cx, |buffer, cx| {
507            buffer.edit([(0..0, "0")], None, cx);
508        })
509    });
510
511    // Both clients see their own edit.
512    deterministic.run_until_parked();
513    channel_buffer_a.read_with(cx_a, |buffer, cx| {
514        assert_eq!(buffer.buffer().read(cx).text(), "12");
515    });
516    channel_buffer_b.read_with(cx_b, |buffer, cx| {
517        assert_eq!(buffer.buffer().read(cx).text(), "01");
518    });
519
520    // Client A reconnects. Both clients see each other's edits, and see
521    // the same collaborators.
522    server.allow_connections();
523    deterministic.advance_clock(RECEIVE_TIMEOUT);
524    channel_buffer_a.read_with(cx_a, |buffer, cx| {
525        assert_eq!(buffer.buffer().read(cx).text(), "012");
526    });
527    channel_buffer_b.read_with(cx_b, |buffer, cx| {
528        assert_eq!(buffer.buffer().read(cx).text(), "012");
529    });
530
531    channel_buffer_a.read_with(cx_a, |buffer_a, _| {
532        channel_buffer_b.read_with(cx_b, |buffer_b, _| {
533            assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
534        });
535    });
536}
537
538#[gpui::test]
539async fn test_channel_buffers_and_server_restarts(
540    deterministic: BackgroundExecutor,
541    cx_a: &mut TestAppContext,
542    cx_b: &mut TestAppContext,
543    cx_c: &mut TestAppContext,
544) {
545    let mut server = TestServer::start(deterministic.clone()).await;
546    let client_a = server.create_client(cx_a, "user_a").await;
547    let client_b = server.create_client(cx_b, "user_b").await;
548    let client_c = server.create_client(cx_c, "user_c").await;
549
550    let channel_id = server
551        .make_channel(
552            "the-channel",
553            None,
554            (&client_a, cx_a),
555            &mut [(&client_b, cx_b), (&client_c, cx_c)],
556        )
557        .await;
558
559    let channel_buffer_a = client_a
560        .channel_store()
561        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
562        .await
563        .unwrap();
564    let channel_buffer_b = client_b
565        .channel_store()
566        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
567        .await
568        .unwrap();
569    let _channel_buffer_c = client_c
570        .channel_store()
571        .update(cx_c, |store, cx| store.open_channel_buffer(channel_id, cx))
572        .await
573        .unwrap();
574
575    channel_buffer_a.update(cx_a, |buffer, cx| {
576        buffer.buffer().update(cx, |buffer, cx| {
577            buffer.edit([(0..0, "1")], None, cx);
578        })
579    });
580    deterministic.run_until_parked();
581
582    // Client C can't reconnect.
583    client_c.override_establish_connection(|_, cx| cx.spawn(async |_| future::pending().await));
584
585    // Server stops.
586    server.reset().await;
587    deterministic.advance_clock(RECEIVE_TIMEOUT);
588
589    // While the server is down, both clients make an edit.
590    channel_buffer_a.update(cx_a, |buffer, cx| {
591        buffer.buffer().update(cx, |buffer, cx| {
592            buffer.edit([(1..1, "2")], None, cx);
593        })
594    });
595    channel_buffer_b.update(cx_b, |buffer, cx| {
596        buffer.buffer().update(cx, |buffer, cx| {
597            buffer.edit([(0..0, "0")], None, cx);
598        })
599    });
600
601    // Server restarts.
602    server.start().await.unwrap();
603    deterministic.advance_clock(CLEANUP_TIMEOUT);
604
605    // Clients reconnects. Clients A and B see each other's edits, and see
606    // that client C has disconnected.
607    channel_buffer_a.read_with(cx_a, |buffer, cx| {
608        assert_eq!(buffer.buffer().read(cx).text(), "012");
609    });
610    channel_buffer_b.read_with(cx_b, |buffer, cx| {
611        assert_eq!(buffer.buffer().read(cx).text(), "012");
612    });
613
614    channel_buffer_a.read_with(cx_a, |buffer_a, _| {
615        channel_buffer_b.read_with(cx_b, |buffer_b, _| {
616            assert_collaborators(
617                buffer_a.collaborators(),
618                &[client_a.user_id(), client_b.user_id()],
619            );
620            assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
621        });
622    });
623}
624
625#[gpui::test]
626async fn test_channel_buffer_changes(
627    deterministic: BackgroundExecutor,
628    cx_a: &mut TestAppContext,
629    cx_b: &mut TestAppContext,
630) {
631    let (server, client_a, client_b, channel_id) = TestServer::start2(cx_a, cx_b).await;
632    let (_, cx_a) = client_a.build_test_workspace(cx_a).await;
633    let (workspace_b, cx_b) = client_b.build_test_workspace(cx_b).await;
634    let channel_store_b = client_b.channel_store().clone();
635
636    // Editing the channel notes should set them to dirty
637    open_channel_notes(channel_id, cx_a).await.unwrap();
638    cx_a.simulate_keystrokes("1");
639    channel_store_b.read_with(cx_b, |channel_store, _| {
640        assert!(channel_store.has_channel_buffer_changed(channel_id))
641    });
642
643    // Opening the buffer should clear the changed flag.
644    open_channel_notes(channel_id, cx_b).await.unwrap();
645    channel_store_b.read_with(cx_b, |channel_store, _| {
646        assert!(!channel_store.has_channel_buffer_changed(channel_id))
647    });
648
649    // Editing the channel while the buffer is open should not show that the buffer has changed.
650    cx_a.simulate_keystrokes("2");
651    channel_store_b.read_with(cx_b, |channel_store, _| {
652        assert!(!channel_store.has_channel_buffer_changed(channel_id))
653    });
654
655    // Test that the server is tracking things correctly, and we retain our 'not changed'
656    // state across a disconnect
657    deterministic.advance_clock(ACKNOWLEDGE_DEBOUNCE_INTERVAL);
658    server
659        .simulate_long_connection_interruption(client_b.peer_id().unwrap(), deterministic.clone());
660    channel_store_b.read_with(cx_b, |channel_store, _| {
661        assert!(!channel_store.has_channel_buffer_changed(channel_id))
662    });
663
664    // Closing the buffer should re-enable change tracking
665    cx_b.update(|window, cx| {
666        workspace_b.update(cx, |workspace, cx| {
667            workspace.close_all_items_and_panes(&Default::default(), window, cx)
668        });
669    });
670    deterministic.run_until_parked();
671
672    cx_a.simulate_keystrokes("3");
673    channel_store_b.read_with(cx_b, |channel_store, _| {
674        assert!(channel_store.has_channel_buffer_changed(channel_id))
675    });
676}
677
678#[gpui::test]
679async fn test_channel_buffer_changes_persist(
680    cx_a: &mut TestAppContext,
681    cx_b: &mut TestAppContext,
682    cx_b2: &mut TestAppContext,
683) {
684    let (mut server, client_a, client_b, channel_id) = TestServer::start2(cx_a, cx_b).await;
685    let (_, cx_a) = client_a.build_test_workspace(cx_a).await;
686    let (_, cx_b) = client_b.build_test_workspace(cx_b).await;
687
688    // a) edits the notes
689    open_channel_notes(channel_id, cx_a).await.unwrap();
690    cx_a.simulate_keystrokes("1");
691    // b) opens them to observe the current version
692    open_channel_notes(channel_id, cx_b).await.unwrap();
693
694    // On boot the client should get the correct state.
695    let client_b2 = server.create_client(cx_b2, "user_b").await;
696    let channel_store_b2 = client_b2.channel_store().clone();
697    channel_store_b2.read_with(cx_b2, |channel_store, _| {
698        assert!(!channel_store.has_channel_buffer_changed(channel_id))
699    });
700}
701
702#[gpui::test]
703async fn test_channel_buffer_operations_lost_on_reconnect(
704    executor: BackgroundExecutor,
705    cx_a: &mut TestAppContext,
706    cx_b: &mut TestAppContext,
707) {
708    let mut server = TestServer::start(executor.clone()).await;
709    let client_a = server.create_client(cx_a, "user_a").await;
710    let client_b = server.create_client(cx_b, "user_b").await;
711
712    let channel_id = server
713        .make_channel(
714            "the-channel",
715            None,
716            (&client_a, cx_a),
717            &mut [(&client_b, cx_b)],
718        )
719        .await;
720
721    // Both clients open the channel buffer.
722    let channel_buffer_a = client_a
723        .channel_store()
724        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
725        .await
726        .unwrap();
727    let channel_buffer_b = client_b
728        .channel_store()
729        .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
730        .await
731        .unwrap();
732
733    // Step 1: Client A makes an initial edit that syncs to B.
734    channel_buffer_a.update(cx_a, |buffer, cx| {
735        buffer.buffer().update(cx, |buffer, cx| {
736            buffer.edit([(0..0, "a")], None, cx);
737        })
738    });
739    executor.run_until_parked();
740
741    // Verify both clients see "a".
742    channel_buffer_a.read_with(cx_a, |buffer, cx| {
743        assert_eq!(buffer.buffer().read(cx).text(), "a");
744    });
745    channel_buffer_b.read_with(cx_b, |buffer, cx| {
746        assert_eq!(buffer.buffer().read(cx).text(), "a");
747    });
748
749    // Step 2: Disconnect client A. Do NOT advance past RECONNECT_TIMEOUT
750    // so that the buffer stays in `opened_buffers` for rejoin.
751    server.forbid_connections();
752    server.disconnect_client(client_a.peer_id().unwrap());
753    executor.run_until_parked();
754
755    // Step 3: While disconnected, client A makes an offline edit ("b").
756    // on_buffer_update fires but client.send() fails because transport is down.
757    channel_buffer_a.update(cx_a, |buffer, cx| {
758        buffer.buffer().update(cx, |buffer, cx| {
759            buffer.edit([(1..1, "b")], None, cx);
760        })
761    });
762    executor.run_until_parked();
763
764    // Client A sees "ab" locally; B still sees "a".
765    channel_buffer_a.read_with(cx_a, |buffer, cx| {
766        assert_eq!(buffer.buffer().read(cx).text(), "ab");
767    });
768    channel_buffer_b.read_with(cx_b, |buffer, cx| {
769        assert_eq!(buffer.buffer().read(cx).text(), "a");
770    });
771
772    // Step 4: Reconnect and make a racing edit in parallel.
773    //
774    // The race condition occurs when:
775    // 1. Transport reconnects, handle_connect captures version V (with "b") and sends RejoinChannelBuffers
776    // 2. DURING the async gap (awaiting response), user makes edit "c"
777    // 3. on_buffer_update sends UpdateChannelBuffer (succeeds because transport is up)
778    // 4. Server receives BOTH messages concurrently (FuturesUnordered)
779    // 5. If UpdateChannelBuffer commits first, server version is inflated to include "c"
780    // 6. RejoinChannelBuffers reads inflated version and sends it back
781    // 7. Client's serialize_ops(inflated_version) filters out "b" (offline edit)
782    //    because the inflated version's timestamp covers "b"'s timestamp
783
784    // Get the buffer handle for spawning
785    let buffer_for_edit = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
786
787    // Spawn the edit task - it will wait for executor to run it
788    let edit_task = cx_a.spawn({
789        let buffer = buffer_for_edit;
790        async move |mut cx| {
791            let _ = buffer.update(&mut cx, |buffer, cx| {
792                buffer.edit([(2..2, "c")], None, cx);
793            });
794        }
795    });
796
797    // Allow connections so reconnect can succeed
798    server.allow_connections();
799
800    // Advance clock to trigger reconnection attempt
801    executor.advance_clock(RECEIVE_TIMEOUT);
802
803    // Run the edit task - this races with handle_connect
804    edit_task.detach();
805
806    // Let everything settle.
807    executor.run_until_parked();
808
809    // Step 7: Read final buffer text from both clients.
810    let text_a = channel_buffer_a.read_with(cx_a, |buffer, cx| buffer.buffer().read(cx).text());
811    let text_b = channel_buffer_b.read_with(cx_b, |buffer, cx| buffer.buffer().read(cx).text());
812
813    // Both clients must see the same text containing all three edits.
814    assert_eq!(
815        text_a, text_b,
816        "Client A and B diverged! A sees {:?}, B sees {:?}. \
817         Operations were lost during reconnection.",
818        text_a, text_b
819    );
820    assert!(
821        text_a.contains('a'),
822        "Initial edit 'a' missing from final text {:?}",
823        text_a
824    );
825    assert!(
826        text_a.contains('b'),
827        "Offline edit 'b' missing from final text {:?}. \
828         This is the reconnection race bug: the offline operation was \
829         filtered out by serialize_ops because the server_version was \
830         inflated by a racing UpdateChannelBuffer.",
831        text_a
832    );
833    assert!(
834        text_a.contains('c'),
835        "Racing edit 'c' missing from final text {:?}",
836        text_a
837    );
838
839    // Step 8: Verify the invariant directly — every operation known to
840    // client A must be observed by client B's version. If any operation
841    // in A's history is not covered by B's version, it was lost.
842    channel_buffer_a.read_with(cx_a, |buf_a, cx_a_inner| {
843        let buffer_a = buf_a.buffer().read(cx_a_inner);
844        let ops_a = buffer_a.operations();
845        channel_buffer_b.read_with(cx_b, |buf_b, cx_b_inner| {
846            let buffer_b = buf_b.buffer().read(cx_b_inner);
847            let version_b = buffer_b.version();
848            for (lamport, _op) in ops_a.iter() {
849                assert!(
850                    version_b.observed(*lamport),
851                    "Operation with lamport timestamp {:?} from client A \
852                     is NOT observed by client B's version. This operation \
853                     was lost during reconnection.",
854                    lamport
855                );
856            }
857        });
858    });
859}
860
861#[track_caller]
862fn assert_collaborators(collaborators: &HashMap<PeerId, Collaborator>, ids: &[Option<UserId>]) {
863    let mut user_ids = collaborators
864        .values()
865        .map(|collaborator| collaborator.user_id)
866        .collect::<Vec<_>>();
867    user_ids.sort();
868    assert_eq!(
869        user_ids,
870        ids.iter().map(|id| id.unwrap()).collect::<Vec<_>>()
871    );
872}
873
874fn buffer_text(channel_buffer: &Entity<language::Buffer>, cx: &mut TestAppContext) -> String {
875    channel_buffer.read_with(cx, |buffer, _| buffer.text())
876}