1use crate::{rpc::RECONNECT_TIMEOUT, tests::TestServer};
2use call::ActiveCall;
3use client::UserId;
4use collab_ui::channel_view::ChannelView;
5use collections::HashMap;
6use gpui::{executor::Deterministic, ModelHandle, TestAppContext};
7use rpc::{proto, RECEIVE_TIMEOUT};
8use serde_json::json;
9use std::sync::Arc;
10
11#[gpui::test]
12async fn test_core_channel_buffers(
13 deterministic: Arc<Deterministic>,
14 cx_a: &mut TestAppContext,
15 cx_b: &mut TestAppContext,
16) {
17 deterministic.forbid_parking();
18 let mut server = TestServer::start(&deterministic).await;
19 let client_a = server.create_client(cx_a, "user_a").await;
20 let client_b = server.create_client(cx_b, "user_b").await;
21
22 let zed_id = server
23 .make_channel("zed", (&client_a, cx_a), &mut [(&client_b, cx_b)])
24 .await;
25
26 // Client A joins the channel buffer
27 let channel_buffer_a = client_a
28 .channel_store()
29 .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx))
30 .await
31 .unwrap();
32
33 // Client A edits the buffer
34 let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
35
36 buffer_a.update(cx_a, |buffer, cx| {
37 buffer.edit([(0..0, "hello world")], None, cx)
38 });
39 buffer_a.update(cx_a, |buffer, cx| {
40 buffer.edit([(5..5, ", cruel")], None, cx)
41 });
42 buffer_a.update(cx_a, |buffer, cx| {
43 buffer.edit([(0..5, "goodbye")], None, cx)
44 });
45 buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
46 deterministic.run_until_parked();
47
48 assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
49
50 // Client B joins the channel buffer
51 let channel_buffer_b = client_b
52 .channel_store()
53 .update(cx_b, |channel, cx| channel.open_channel_buffer(zed_id, cx))
54 .await
55 .unwrap();
56
57 channel_buffer_b.read_with(cx_b, |buffer, _| {
58 assert_collaborators(
59 buffer.collaborators(),
60 &[client_a.user_id(), client_b.user_id()],
61 );
62 });
63
64 // Client B sees the correct text, and then edits it
65 let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
66 assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
67 buffer_b.update(cx_b, |buffer, cx| {
68 buffer.edit([(7..12, "beautiful")], None, cx)
69 });
70
71 // Both A and B see the new edit
72 deterministic.run_until_parked();
73 assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
74 assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
75
76 // Client A closes the channel buffer.
77 cx_a.update(|_| drop(channel_buffer_a));
78 deterministic.run_until_parked();
79
80 // Client B sees that client A is gone from the channel buffer.
81 channel_buffer_b.read_with(cx_b, |buffer, _| {
82 assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
83 });
84
85 // Client A rejoins the channel buffer
86 let _channel_buffer_a = client_a
87 .channel_store()
88 .update(cx_a, |channels, cx| {
89 channels.open_channel_buffer(zed_id, cx)
90 })
91 .await
92 .unwrap();
93 deterministic.run_until_parked();
94
95 // Sanity test, make sure we saw A rejoining
96 channel_buffer_b.read_with(cx_b, |buffer, _| {
97 assert_collaborators(
98 &buffer.collaborators(),
99 &[client_b.user_id(), client_a.user_id()],
100 );
101 });
102
103 // Client A loses connection.
104 server.forbid_connections();
105 server.disconnect_client(client_a.peer_id().unwrap());
106 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
107
108 // Client B observes A disconnect
109 channel_buffer_b.read_with(cx_b, |buffer, _| {
110 assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
111 });
112
113 // TODO:
114 // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
115 // - Test interaction with channel deletion while buffer is open
116}
117
118#[gpui::test]
119async fn test_channel_buffer_replica_ids(
120 deterministic: Arc<Deterministic>,
121 cx_a: &mut TestAppContext,
122 cx_b: &mut TestAppContext,
123 cx_c: &mut TestAppContext,
124) {
125 deterministic.forbid_parking();
126 let mut server = TestServer::start(&deterministic).await;
127 let client_a = server.create_client(cx_a, "user_a").await;
128 let client_b = server.create_client(cx_b, "user_b").await;
129 let client_c = server.create_client(cx_c, "user_c").await;
130
131 let channel_id = server
132 .make_channel(
133 "zed",
134 (&client_a, cx_a),
135 &mut [(&client_b, cx_b), (&client_c, cx_c)],
136 )
137 .await;
138
139 let active_call_a = cx_a.read(ActiveCall::global);
140 let active_call_b = cx_b.read(ActiveCall::global);
141
142 // Clients A and B join a channel.
143 active_call_a
144 .update(cx_a, |call, cx| call.join_channel(channel_id, cx))
145 .await
146 .unwrap();
147 active_call_b
148 .update(cx_b, |call, cx| call.join_channel(channel_id, cx))
149 .await
150 .unwrap();
151
152 // Clients A, B, and C join a channel buffer
153 // C first so that the replica IDs in the project and the channel buffer are different
154 let channel_buffer_c = client_c
155 .channel_store()
156 .update(cx_c, |channel, cx| {
157 channel.open_channel_buffer(channel_id, cx)
158 })
159 .await
160 .unwrap();
161 let channel_buffer_b = client_b
162 .channel_store()
163 .update(cx_b, |channel, cx| {
164 channel.open_channel_buffer(channel_id, cx)
165 })
166 .await
167 .unwrap();
168 let channel_buffer_a = client_a
169 .channel_store()
170 .update(cx_a, |channel, cx| {
171 channel.open_channel_buffer(channel_id, cx)
172 })
173 .await
174 .unwrap();
175
176 // Client B shares a project
177 client_b
178 .fs()
179 .insert_tree("/dir", json!({ "file.txt": "contents" }))
180 .await;
181 let (project_b, _) = client_b.build_local_project("/dir", cx_b).await;
182 let shared_project_id = active_call_b
183 .update(cx_b, |call, cx| call.share_project(project_b.clone(), cx))
184 .await
185 .unwrap();
186
187 // Client A joins the project
188 let project_a = client_a.build_remote_project(shared_project_id, cx_a).await;
189 deterministic.run_until_parked();
190
191 // Client C is in a separate project.
192 client_c.fs().insert_tree("/dir", json!({})).await;
193 let (project_c, _) = client_c.build_local_project("/dir", cx_c).await;
194
195 // Note that each user has a different replica id in the projects vs the
196 // channel buffer.
197 channel_buffer_a.read_with(cx_a, |channel_buffer, cx| {
198 assert_eq!(project_a.read(cx).replica_id(), 1);
199 assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 2);
200 });
201 channel_buffer_b.read_with(cx_b, |channel_buffer, cx| {
202 assert_eq!(project_b.read(cx).replica_id(), 0);
203 assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 1);
204 });
205 channel_buffer_c.read_with(cx_c, |channel_buffer, cx| {
206 // C is not in the project
207 assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 0);
208 });
209
210 let channel_window_a = cx_a
211 .add_window(|cx| ChannelView::new(project_a.clone(), channel_buffer_a.clone(), None, cx));
212 let channel_window_b = cx_b
213 .add_window(|cx| ChannelView::new(project_b.clone(), channel_buffer_b.clone(), None, cx));
214 let channel_window_c = cx_c
215 .add_window(|cx| ChannelView::new(project_c.clone(), channel_buffer_c.clone(), None, cx));
216
217 let channel_view_a = channel_window_a.root(cx_a);
218 let channel_view_b = channel_window_b.root(cx_b);
219 let channel_view_c = channel_window_c.root(cx_c);
220
221 // For clients A and B, the replica ids in the channel buffer are mapped
222 // so that they match the same users' replica ids in their shared project.
223 channel_view_a.read_with(cx_a, |view, cx| {
224 assert_eq!(
225 view.project_replica_ids_by_channel_buffer_replica_id(cx),
226 [(1, 0), (2, 1)].into_iter().collect::<HashMap<_, _>>()
227 );
228 });
229 channel_view_b.read_with(cx_b, |view, cx| {
230 assert_eq!(
231 view.project_replica_ids_by_channel_buffer_replica_id(cx),
232 [(1, 0), (2, 1)].into_iter().collect::<HashMap<u16, u16>>(),
233 )
234 });
235
236 // Client C only sees themself, as they're not part of any shared project
237 channel_view_c.read_with(cx_c, |view, cx| {
238 assert_eq!(
239 view.project_replica_ids_by_channel_buffer_replica_id(cx),
240 [(0, 0)].into_iter().collect::<HashMap<u16, u16>>(),
241 );
242 });
243}
244
245#[track_caller]
246fn assert_collaborators(collaborators: &[proto::Collaborator], ids: &[Option<UserId>]) {
247 assert_eq!(
248 collaborators
249 .into_iter()
250 .map(|collaborator| collaborator.user_id)
251 .collect::<Vec<_>>(),
252 ids.into_iter().map(|id| id.unwrap()).collect::<Vec<_>>()
253 );
254}
255
256fn buffer_text(channel_buffer: &ModelHandle<language::Buffer>, cx: &mut TestAppContext) -> String {
257 channel_buffer.read_with(cx, |buffer, _| buffer.text())
258}