1use crate::{rpc::RECONNECT_TIMEOUT, tests::TestServer};
2use call::ActiveCall;
3use channel::Channel;
4use client::UserId;
5use collab_ui::channel_view::ChannelView;
6use collections::HashMap;
7use futures::future;
8use gpui::{executor::Deterministic, ModelHandle, TestAppContext};
9use rpc::{proto, RECEIVE_TIMEOUT};
10use serde_json::json;
11use std::sync::Arc;
12
13#[gpui::test]
14async fn test_core_channel_buffers(
15 deterministic: Arc<Deterministic>,
16 cx_a: &mut TestAppContext,
17 cx_b: &mut TestAppContext,
18) {
19 deterministic.forbid_parking();
20 let mut server = TestServer::start(&deterministic).await;
21 let client_a = server.create_client(cx_a, "user_a").await;
22 let client_b = server.create_client(cx_b, "user_b").await;
23
24 let channel_id = server
25 .make_channel("zed", (&client_a, cx_a), &mut [(&client_b, cx_b)])
26 .await;
27
28 // Client A joins the channel buffer
29 let channel_buffer_a = client_a
30 .channel_store()
31 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
32 .await
33 .unwrap();
34
35 // Client A edits the buffer
36 let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
37 buffer_a.update(cx_a, |buffer, cx| {
38 buffer.edit([(0..0, "hello world")], None, cx)
39 });
40 buffer_a.update(cx_a, |buffer, cx| {
41 buffer.edit([(5..5, ", cruel")], None, cx)
42 });
43 buffer_a.update(cx_a, |buffer, cx| {
44 buffer.edit([(0..5, "goodbye")], None, cx)
45 });
46 buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
47 assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
48 deterministic.run_until_parked();
49
50 // Client B joins the channel buffer
51 let channel_buffer_b = client_b
52 .channel_store()
53 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
54 .await
55 .unwrap();
56 channel_buffer_b.read_with(cx_b, |buffer, _| {
57 assert_collaborators(
58 buffer.collaborators(),
59 &[client_a.user_id(), client_b.user_id()],
60 );
61 });
62
63 // Client B sees the correct text, and then edits it
64 let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
65 assert_eq!(
66 buffer_b.read_with(cx_b, |buffer, _| buffer.remote_id()),
67 buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id())
68 );
69 assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
70 buffer_b.update(cx_b, |buffer, cx| {
71 buffer.edit([(7..12, "beautiful")], None, cx)
72 });
73
74 // Both A and B see the new edit
75 deterministic.run_until_parked();
76 assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
77 assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
78
79 // Client A closes the channel buffer.
80 cx_a.update(|_| drop(channel_buffer_a));
81 deterministic.run_until_parked();
82
83 // Client B sees that client A is gone from the channel buffer.
84 channel_buffer_b.read_with(cx_b, |buffer, _| {
85 assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
86 });
87
88 // Client A rejoins the channel buffer
89 let _channel_buffer_a = client_a
90 .channel_store()
91 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
92 .await
93 .unwrap();
94 deterministic.run_until_parked();
95
96 // Sanity test, make sure we saw A rejoining
97 channel_buffer_b.read_with(cx_b, |buffer, _| {
98 assert_collaborators(
99 &buffer.collaborators(),
100 &[client_b.user_id(), client_a.user_id()],
101 );
102 });
103
104 // Client A loses connection.
105 server.forbid_connections();
106 server.disconnect_client(client_a.peer_id().unwrap());
107 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
108
109 // Client B observes A disconnect
110 channel_buffer_b.read_with(cx_b, |buffer, _| {
111 assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
112 });
113
114 // TODO:
115 // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
116 // - Test interaction with channel deletion while buffer is open
117}
118
119#[gpui::test]
120async fn test_channel_buffer_replica_ids(
121 deterministic: Arc<Deterministic>,
122 cx_a: &mut TestAppContext,
123 cx_b: &mut TestAppContext,
124 cx_c: &mut TestAppContext,
125) {
126 deterministic.forbid_parking();
127 let mut server = TestServer::start(&deterministic).await;
128 let client_a = server.create_client(cx_a, "user_a").await;
129 let client_b = server.create_client(cx_b, "user_b").await;
130 let client_c = server.create_client(cx_c, "user_c").await;
131
132 let channel_id = server
133 .make_channel(
134 "the-channel",
135 (&client_a, cx_a),
136 &mut [(&client_b, cx_b), (&client_c, cx_c)],
137 )
138 .await;
139
140 let active_call_a = cx_a.read(ActiveCall::global);
141 let active_call_b = cx_b.read(ActiveCall::global);
142 let active_call_c = cx_c.read(ActiveCall::global);
143
144 // Clients A and B join a channel.
145 active_call_a
146 .update(cx_a, |call, cx| call.join_channel(channel_id, cx))
147 .await
148 .unwrap();
149 active_call_b
150 .update(cx_b, |call, cx| call.join_channel(channel_id, cx))
151 .await
152 .unwrap();
153
154 // Clients A, B, and C join a channel buffer
155 // C first so that the replica IDs in the project and the channel buffer are different
156 let channel_buffer_c = client_c
157 .channel_store()
158 .update(cx_c, |store, cx| store.open_channel_buffer(channel_id, cx))
159 .await
160 .unwrap();
161 let channel_buffer_b = client_b
162 .channel_store()
163 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
164 .await
165 .unwrap();
166 let channel_buffer_a = client_a
167 .channel_store()
168 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
169 .await
170 .unwrap();
171
172 // Client B shares a project
173 client_b
174 .fs()
175 .insert_tree("/dir", json!({ "file.txt": "contents" }))
176 .await;
177 let (project_b, _) = client_b.build_local_project("/dir", cx_b).await;
178 let shared_project_id = active_call_b
179 .update(cx_b, |call, cx| call.share_project(project_b.clone(), cx))
180 .await
181 .unwrap();
182
183 // Client A joins the project
184 let project_a = client_a.build_remote_project(shared_project_id, cx_a).await;
185 deterministic.run_until_parked();
186
187 // Client C is in a separate project.
188 client_c.fs().insert_tree("/dir", json!({})).await;
189 let (separate_project_c, _) = client_c.build_local_project("/dir", cx_c).await;
190
191 // Note that each user has a different replica id in the projects vs the
192 // channel buffer.
193 channel_buffer_a.read_with(cx_a, |channel_buffer, cx| {
194 assert_eq!(project_a.read(cx).replica_id(), 1);
195 assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 2);
196 });
197 channel_buffer_b.read_with(cx_b, |channel_buffer, cx| {
198 assert_eq!(project_b.read(cx).replica_id(), 0);
199 assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 1);
200 });
201 channel_buffer_c.read_with(cx_c, |channel_buffer, cx| {
202 // C is not in the project
203 assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 0);
204 });
205
206 let channel_window_a =
207 cx_a.add_window(|cx| ChannelView::new(project_a.clone(), channel_buffer_a.clone(), cx));
208 let channel_window_b =
209 cx_b.add_window(|cx| ChannelView::new(project_b.clone(), channel_buffer_b.clone(), cx));
210 let channel_window_c = cx_c.add_window(|cx| {
211 ChannelView::new(separate_project_c.clone(), channel_buffer_c.clone(), cx)
212 });
213
214 let channel_view_a = channel_window_a.root(cx_a);
215 let channel_view_b = channel_window_b.root(cx_b);
216 let channel_view_c = channel_window_c.root(cx_c);
217
218 // For clients A and B, the replica ids in the channel buffer are mapped
219 // so that they match the same users' replica ids in their shared project.
220 channel_view_a.read_with(cx_a, |view, cx| {
221 assert_eq!(
222 view.editor.read(cx).replica_id_map().unwrap(),
223 &[(1, 0), (2, 1)].into_iter().collect::<HashMap<_, _>>()
224 );
225 });
226 channel_view_b.read_with(cx_b, |view, cx| {
227 assert_eq!(
228 view.editor.read(cx).replica_id_map().unwrap(),
229 &[(1, 0), (2, 1)].into_iter().collect::<HashMap<u16, u16>>(),
230 )
231 });
232
233 // Client C only sees themself, as they're not part of any shared project
234 channel_view_c.read_with(cx_c, |view, cx| {
235 assert_eq!(
236 view.editor.read(cx).replica_id_map().unwrap(),
237 &[(0, 0)].into_iter().collect::<HashMap<u16, u16>>(),
238 );
239 });
240
241 // Client C joins the project that clients A and B are in.
242 active_call_c
243 .update(cx_c, |call, cx| call.join_channel(channel_id, cx))
244 .await
245 .unwrap();
246 let project_c = client_c.build_remote_project(shared_project_id, cx_c).await;
247 deterministic.run_until_parked();
248 project_c.read_with(cx_c, |project, _| {
249 assert_eq!(project.replica_id(), 2);
250 });
251
252 // For clients A and B, client C's replica id in the channel buffer is
253 // now mapped to their replica id in the shared project.
254 channel_view_a.read_with(cx_a, |view, cx| {
255 assert_eq!(
256 view.editor.read(cx).replica_id_map().unwrap(),
257 &[(1, 0), (2, 1), (0, 2)]
258 .into_iter()
259 .collect::<HashMap<_, _>>()
260 );
261 });
262 channel_view_b.read_with(cx_b, |view, cx| {
263 assert_eq!(
264 view.editor.read(cx).replica_id_map().unwrap(),
265 &[(1, 0), (2, 1), (0, 2)]
266 .into_iter()
267 .collect::<HashMap<_, _>>(),
268 )
269 });
270}
271
272#[gpui::test]
273async fn test_reopen_channel_buffer(deterministic: Arc<Deterministic>, cx_a: &mut TestAppContext) {
274 deterministic.forbid_parking();
275 let mut server = TestServer::start(&deterministic).await;
276 let client_a = server.create_client(cx_a, "user_a").await;
277
278 let channel_id = server
279 .make_channel("the-channel", (&client_a, cx_a), &mut [])
280 .await;
281
282 let channel_buffer_1 = client_a
283 .channel_store()
284 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
285 let channel_buffer_2 = client_a
286 .channel_store()
287 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
288 let channel_buffer_3 = client_a
289 .channel_store()
290 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx));
291
292 // All concurrent tasks for opening a channel buffer return the same model handle.
293 let (channel_buffer, channel_buffer_2, channel_buffer_3) =
294 future::try_join3(channel_buffer_1, channel_buffer_2, channel_buffer_3)
295 .await
296 .unwrap();
297 let channel_buffer_model_id = channel_buffer.id();
298 assert_eq!(channel_buffer, channel_buffer_2);
299 assert_eq!(channel_buffer, channel_buffer_3);
300
301 channel_buffer.update(cx_a, |buffer, cx| {
302 buffer.buffer().update(cx, |buffer, cx| {
303 buffer.edit([(0..0, "hello")], None, cx);
304 })
305 });
306 deterministic.run_until_parked();
307
308 cx_a.update(|_| {
309 drop(channel_buffer);
310 drop(channel_buffer_2);
311 drop(channel_buffer_3);
312 });
313 deterministic.run_until_parked();
314
315 // The channel buffer can be reopened after dropping it.
316 let channel_buffer = client_a
317 .channel_store()
318 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
319 .await
320 .unwrap();
321 assert_ne!(channel_buffer.id(), channel_buffer_model_id);
322 channel_buffer.update(cx_a, |buffer, cx| {
323 buffer.buffer().update(cx, |buffer, _| {
324 assert_eq!(buffer.text(), "hello");
325 })
326 });
327}
328
329#[gpui::test]
330async fn test_channel_buffer_disconnect(
331 deterministic: Arc<Deterministic>,
332 cx_a: &mut TestAppContext,
333 cx_b: &mut TestAppContext,
334) {
335 deterministic.forbid_parking();
336 let mut server = TestServer::start(&deterministic).await;
337 let client_a = server.create_client(cx_a, "user_a").await;
338 let client_b = server.create_client(cx_b, "user_b").await;
339
340 let channel_id = server
341 .make_channel("the-channel", (&client_a, cx_a), &mut [(&client_b, cx_b)])
342 .await;
343
344 let channel_buffer_a = client_a
345 .channel_store()
346 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
347 .await
348 .unwrap();
349 let channel_buffer_b = client_b
350 .channel_store()
351 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
352 .await
353 .unwrap();
354
355 server.forbid_connections();
356 server.disconnect_client(client_a.peer_id().unwrap());
357 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
358
359 channel_buffer_a.update(cx_a, |buffer, _| {
360 assert_eq!(
361 buffer.channel().as_ref(),
362 &Channel {
363 id: channel_id,
364 name: "the-channel".to_string()
365 }
366 );
367 assert!(!buffer.is_connected());
368 });
369
370 deterministic.run_until_parked();
371
372 server.allow_connections();
373 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
374
375 deterministic.run_until_parked();
376
377 client_a
378 .channel_store()
379 .update(cx_a, |channel_store, _| {
380 channel_store.remove_channel(channel_id)
381 })
382 .await
383 .unwrap();
384 deterministic.run_until_parked();
385
386 // Channel buffer observed the deletion
387 channel_buffer_b.update(cx_b, |buffer, _| {
388 assert_eq!(
389 buffer.channel().as_ref(),
390 &Channel {
391 id: channel_id,
392 name: "the-channel".to_string()
393 }
394 );
395 assert!(!buffer.is_connected());
396 });
397}
398
399#[gpui::test]
400async fn test_rejoin_channel_buffer(
401 deterministic: Arc<Deterministic>,
402 cx_a: &mut TestAppContext,
403 cx_b: &mut TestAppContext,
404) {
405 deterministic.forbid_parking();
406 let mut server = TestServer::start(&deterministic).await;
407 let client_a = server.create_client(cx_a, "user_a").await;
408 let client_b = server.create_client(cx_b, "user_b").await;
409
410 let channel_id = server
411 .make_channel("the-channel", (&client_a, cx_a), &mut [(&client_b, cx_b)])
412 .await;
413
414 let channel_buffer_a = client_a
415 .channel_store()
416 .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
417 .await
418 .unwrap();
419 let channel_buffer_b = client_b
420 .channel_store()
421 .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx))
422 .await
423 .unwrap();
424
425 channel_buffer_a.update(cx_a, |buffer, cx| {
426 buffer.buffer().update(cx, |buffer, cx| {
427 buffer.edit([(0..0, "1")], None, cx);
428 })
429 });
430 deterministic.run_until_parked();
431
432 // Client A disconnects.
433 server.forbid_connections();
434 server.disconnect_client(client_a.peer_id().unwrap());
435
436 // Both clients make an edit.
437 channel_buffer_a.update(cx_a, |buffer, cx| {
438 buffer.buffer().update(cx, |buffer, cx| {
439 buffer.edit([(1..1, "2")], None, cx);
440 })
441 });
442 channel_buffer_b.update(cx_b, |buffer, cx| {
443 buffer.buffer().update(cx, |buffer, cx| {
444 buffer.edit([(0..0, "0")], None, cx);
445 })
446 });
447
448 // Both clients see their own edit.
449 deterministic.run_until_parked();
450 channel_buffer_a.read_with(cx_a, |buffer, cx| {
451 assert_eq!(buffer.buffer().read(cx).text(), "12");
452 });
453 channel_buffer_b.read_with(cx_b, |buffer, cx| {
454 assert_eq!(buffer.buffer().read(cx).text(), "01");
455 });
456
457 // Client A reconnects. Both clients see each other's edits, and see
458 // the same collaborators.
459 server.allow_connections();
460 deterministic.advance_clock(RECEIVE_TIMEOUT);
461 channel_buffer_a.read_with(cx_a, |buffer, cx| {
462 assert_eq!(buffer.buffer().read(cx).text(), "012");
463 });
464 channel_buffer_b.read_with(cx_b, |buffer, cx| {
465 assert_eq!(buffer.buffer().read(cx).text(), "012");
466 });
467
468 channel_buffer_a.read_with(cx_a, |buffer_a, _| {
469 channel_buffer_b.read_with(cx_b, |buffer_b, _| {
470 assert_eq!(buffer_a.collaborators(), buffer_b.collaborators());
471 });
472 });
473}
474
475#[track_caller]
476fn assert_collaborators(collaborators: &[proto::Collaborator], ids: &[Option<UserId>]) {
477 assert_eq!(
478 collaborators
479 .into_iter()
480 .map(|collaborator| collaborator.user_id)
481 .collect::<Vec<_>>(),
482 ids.into_iter().map(|id| id.unwrap()).collect::<Vec<_>>()
483 );
484}
485
486fn buffer_text(channel_buffer: &ModelHandle<language::Buffer>, cx: &mut TestAppContext) -> String {
487 channel_buffer.read_with(cx, |buffer, _| buffer.text())
488}