1use crate::{rpc::RECONNECT_TIMEOUT, tests::TestServer};
2use call::ActiveCall;
3use client::UserId;
4use collab_ui::channel_view::ChannelView;
5use collections::HashMap;
6use futures::future;
7use gpui::{executor::Deterministic, ModelHandle, TestAppContext};
8use rpc::{proto, RECEIVE_TIMEOUT};
9use serde_json::json;
10use std::sync::Arc;
11
12#[gpui::test]
13async fn test_core_channel_buffers(
14 deterministic: Arc<Deterministic>,
15 cx_a: &mut TestAppContext,
16 cx_b: &mut TestAppContext,
17) {
18 deterministic.forbid_parking();
19 let mut server = TestServer::start(&deterministic).await;
20 let client_a = server.create_client(cx_a, "user_a").await;
21 let client_b = server.create_client(cx_b, "user_b").await;
22
23 let zed_id = server
24 .make_channel("zed", (&client_a, cx_a), &mut [(&client_b, cx_b)])
25 .await;
26
27 // Client A joins the channel buffer
28 let channel_buffer_a = client_a
29 .channel_store()
30 .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx))
31 .await
32 .unwrap();
33
34 // Client A edits the buffer
35 let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
36
37 buffer_a.update(cx_a, |buffer, cx| {
38 buffer.edit([(0..0, "hello world")], None, cx)
39 });
40 buffer_a.update(cx_a, |buffer, cx| {
41 buffer.edit([(5..5, ", cruel")], None, cx)
42 });
43 buffer_a.update(cx_a, |buffer, cx| {
44 buffer.edit([(0..5, "goodbye")], None, cx)
45 });
46 buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
47 deterministic.run_until_parked();
48
49 assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
50
51 // Client B joins the channel buffer
52 let channel_buffer_b = client_b
53 .channel_store()
54 .update(cx_b, |channel, cx| channel.open_channel_buffer(zed_id, cx))
55 .await
56 .unwrap();
57
58 channel_buffer_b.read_with(cx_b, |buffer, _| {
59 assert_collaborators(
60 buffer.collaborators(),
61 &[client_a.user_id(), client_b.user_id()],
62 );
63 });
64
65 // Client B sees the correct text, and then edits it
66 let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
67 assert_eq!(
68 buffer_b.read_with(cx_b, |buffer, _| buffer.remote_id()),
69 buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id())
70 );
71 assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
72 buffer_b.update(cx_b, |buffer, cx| {
73 buffer.edit([(7..12, "beautiful")], None, cx)
74 });
75
76 // Both A and B see the new edit
77 deterministic.run_until_parked();
78 assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
79 assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
80
81 // Client A closes the channel buffer.
82 cx_a.update(|_| drop(channel_buffer_a));
83 deterministic.run_until_parked();
84
85 // Client B sees that client A is gone from the channel buffer.
86 channel_buffer_b.read_with(cx_b, |buffer, _| {
87 assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
88 });
89
90 // Client A rejoins the channel buffer
91 let _channel_buffer_a = client_a
92 .channel_store()
93 .update(cx_a, |channels, cx| {
94 channels.open_channel_buffer(zed_id, cx)
95 })
96 .await
97 .unwrap();
98 deterministic.run_until_parked();
99
100 // Sanity test, make sure we saw A rejoining
101 channel_buffer_b.read_with(cx_b, |buffer, _| {
102 assert_collaborators(
103 &buffer.collaborators(),
104 &[client_b.user_id(), client_a.user_id()],
105 );
106 });
107
108 // Client A loses connection.
109 server.forbid_connections();
110 server.disconnect_client(client_a.peer_id().unwrap());
111 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
112
113 // Client B observes A disconnect
114 channel_buffer_b.read_with(cx_b, |buffer, _| {
115 assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
116 });
117
118 // TODO:
119 // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
120 // - Test interaction with channel deletion while buffer is open
121}
122
123#[gpui::test]
124async fn test_channel_buffer_replica_ids(
125 deterministic: Arc<Deterministic>,
126 cx_a: &mut TestAppContext,
127 cx_b: &mut TestAppContext,
128 cx_c: &mut TestAppContext,
129) {
130 deterministic.forbid_parking();
131 let mut server = TestServer::start(&deterministic).await;
132 let client_a = server.create_client(cx_a, "user_a").await;
133 let client_b = server.create_client(cx_b, "user_b").await;
134 let client_c = server.create_client(cx_c, "user_c").await;
135
136 let channel_id = server
137 .make_channel(
138 "zed",
139 (&client_a, cx_a),
140 &mut [(&client_b, cx_b), (&client_c, cx_c)],
141 )
142 .await;
143
144 let active_call_a = cx_a.read(ActiveCall::global);
145 let active_call_b = cx_b.read(ActiveCall::global);
146 let active_call_c = cx_c.read(ActiveCall::global);
147
148 // Clients A and B join a channel.
149 active_call_a
150 .update(cx_a, |call, cx| call.join_channel(channel_id, cx))
151 .await
152 .unwrap();
153 active_call_b
154 .update(cx_b, |call, cx| call.join_channel(channel_id, cx))
155 .await
156 .unwrap();
157
158 // Clients A, B, and C join a channel buffer
159 // C first so that the replica IDs in the project and the channel buffer are different
160 let channel_buffer_c = client_c
161 .channel_store()
162 .update(cx_c, |channel, cx| {
163 channel.open_channel_buffer(channel_id, cx)
164 })
165 .await
166 .unwrap();
167 let channel_buffer_b = client_b
168 .channel_store()
169 .update(cx_b, |channel, cx| {
170 channel.open_channel_buffer(channel_id, cx)
171 })
172 .await
173 .unwrap();
174 let channel_buffer_a = client_a
175 .channel_store()
176 .update(cx_a, |channel, cx| {
177 channel.open_channel_buffer(channel_id, cx)
178 })
179 .await
180 .unwrap();
181
182 // Client B shares a project
183 client_b
184 .fs()
185 .insert_tree("/dir", json!({ "file.txt": "contents" }))
186 .await;
187 let (project_b, _) = client_b.build_local_project("/dir", cx_b).await;
188 let shared_project_id = active_call_b
189 .update(cx_b, |call, cx| call.share_project(project_b.clone(), cx))
190 .await
191 .unwrap();
192
193 // Client A joins the project
194 let project_a = client_a.build_remote_project(shared_project_id, cx_a).await;
195 deterministic.run_until_parked();
196
197 // Client C is in a separate project.
198 client_c.fs().insert_tree("/dir", json!({})).await;
199 let (separate_project_c, _) = client_c.build_local_project("/dir", cx_c).await;
200
201 // Note that each user has a different replica id in the projects vs the
202 // channel buffer.
203 channel_buffer_a.read_with(cx_a, |channel_buffer, cx| {
204 assert_eq!(project_a.read(cx).replica_id(), 1);
205 assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 2);
206 });
207 channel_buffer_b.read_with(cx_b, |channel_buffer, cx| {
208 assert_eq!(project_b.read(cx).replica_id(), 0);
209 assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 1);
210 });
211 channel_buffer_c.read_with(cx_c, |channel_buffer, cx| {
212 // C is not in the project
213 assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 0);
214 });
215
216 let channel_window_a =
217 cx_a.add_window(|cx| ChannelView::new(project_a.clone(), channel_buffer_a.clone(), cx));
218 let channel_window_b =
219 cx_b.add_window(|cx| ChannelView::new(project_b.clone(), channel_buffer_b.clone(), cx));
220 let channel_window_c = cx_c.add_window(|cx| {
221 ChannelView::new(separate_project_c.clone(), channel_buffer_c.clone(), cx)
222 });
223
224 let channel_view_a = channel_window_a.root(cx_a);
225 let channel_view_b = channel_window_b.root(cx_b);
226 let channel_view_c = channel_window_c.root(cx_c);
227
228 // For clients A and B, the replica ids in the channel buffer are mapped
229 // so that they match the same users' replica ids in their shared project.
230 channel_view_a.read_with(cx_a, |view, cx| {
231 assert_eq!(
232 view.editor.read(cx).replica_id_map().unwrap(),
233 &[(1, 0), (2, 1)].into_iter().collect::<HashMap<_, _>>()
234 );
235 });
236 channel_view_b.read_with(cx_b, |view, cx| {
237 assert_eq!(
238 view.editor.read(cx).replica_id_map().unwrap(),
239 &[(1, 0), (2, 1)].into_iter().collect::<HashMap<u16, u16>>(),
240 )
241 });
242
243 // Client C only sees themself, as they're not part of any shared project
244 channel_view_c.read_with(cx_c, |view, cx| {
245 assert_eq!(
246 view.editor.read(cx).replica_id_map().unwrap(),
247 &[(0, 0)].into_iter().collect::<HashMap<u16, u16>>(),
248 );
249 });
250
251 // Client C joins the project that clients A and B are in.
252 active_call_c
253 .update(cx_c, |call, cx| call.join_channel(channel_id, cx))
254 .await
255 .unwrap();
256 let project_c = client_c.build_remote_project(shared_project_id, cx_c).await;
257 deterministic.run_until_parked();
258 project_c.read_with(cx_c, |project, _| {
259 assert_eq!(project.replica_id(), 2);
260 });
261
262 // For clients A and B, client C's replica id in the channel buffer is
263 // now mapped to their replica id in the shared project.
264 channel_view_a.read_with(cx_a, |view, cx| {
265 assert_eq!(
266 view.editor.read(cx).replica_id_map().unwrap(),
267 &[(1, 0), (2, 1), (0, 2)]
268 .into_iter()
269 .collect::<HashMap<_, _>>()
270 );
271 });
272 channel_view_b.read_with(cx_b, |view, cx| {
273 assert_eq!(
274 view.editor.read(cx).replica_id_map().unwrap(),
275 &[(1, 0), (2, 1), (0, 2)]
276 .into_iter()
277 .collect::<HashMap<_, _>>(),
278 )
279 });
280}
281
282#[gpui::test]
283async fn test_reopen_channel_buffer(deterministic: Arc<Deterministic>, cx_a: &mut TestAppContext) {
284 deterministic.forbid_parking();
285 let mut server = TestServer::start(&deterministic).await;
286 let client_a = server.create_client(cx_a, "user_a").await;
287
288 let zed_id = server.make_channel("zed", (&client_a, cx_a), &mut []).await;
289
290 let channel_buffer_1 = client_a
291 .channel_store()
292 .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx));
293 let channel_buffer_2 = client_a
294 .channel_store()
295 .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx));
296 let channel_buffer_3 = client_a
297 .channel_store()
298 .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx));
299
300 // All concurrent tasks for opening a channel buffer return the same model handle.
301 let (channel_buffer_1, channel_buffer_2, channel_buffer_3) =
302 future::try_join3(channel_buffer_1, channel_buffer_2, channel_buffer_3)
303 .await
304 .unwrap();
305 let model_id = channel_buffer_1.id();
306 assert_eq!(channel_buffer_1, channel_buffer_2);
307 assert_eq!(channel_buffer_1, channel_buffer_3);
308
309 channel_buffer_1.update(cx_a, |buffer, cx| {
310 buffer.buffer().update(cx, |buffer, cx| {
311 buffer.edit([(0..0, "hello")], None, cx);
312 })
313 });
314 deterministic.run_until_parked();
315
316 cx_a.update(|_| {
317 drop(channel_buffer_1);
318 drop(channel_buffer_2);
319 drop(channel_buffer_3);
320 });
321 deterministic.run_until_parked();
322
323 // The channel buffer can be reopened after dropping it.
324 let channel_buffer = client_a
325 .channel_store()
326 .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx))
327 .await
328 .unwrap();
329 assert_ne!(channel_buffer.id(), model_id);
330 channel_buffer.update(cx_a, |buffer, cx| {
331 buffer.buffer().update(cx, |buffer, _| {
332 assert_eq!(buffer.text(), "hello");
333 })
334 });
335}
336
337#[track_caller]
338fn assert_collaborators(collaborators: &[proto::Collaborator], ids: &[Option<UserId>]) {
339 assert_eq!(
340 collaborators
341 .into_iter()
342 .map(|collaborator| collaborator.user_id)
343 .collect::<Vec<_>>(),
344 ids.into_iter().map(|id| id.unwrap()).collect::<Vec<_>>()
345 );
346}
347
348fn buffer_text(channel_buffer: &ModelHandle<language::Buffer>, cx: &mut TestAppContext) -> String {
349 channel_buffer.read_with(cx, |buffer, _| buffer.text())
350}