1use crate::{rpc::RECONNECT_TIMEOUT, tests::TestServer};
2use call::ActiveCall;
3use client::UserId;
4use collab_ui::channel_view::ChannelView;
5use collections::HashMap;
6use gpui::{executor::Deterministic, ModelHandle, TestAppContext};
7use rpc::{proto, RECEIVE_TIMEOUT};
8use serde_json::json;
9use std::sync::Arc;
10
11#[gpui::test]
12async fn test_core_channel_buffers(
13 deterministic: Arc<Deterministic>,
14 cx_a: &mut TestAppContext,
15 cx_b: &mut TestAppContext,
16) {
17 deterministic.forbid_parking();
18 let mut server = TestServer::start(&deterministic).await;
19 let client_a = server.create_client(cx_a, "user_a").await;
20 let client_b = server.create_client(cx_b, "user_b").await;
21
22 let zed_id = server
23 .make_channel("zed", (&client_a, cx_a), &mut [(&client_b, cx_b)])
24 .await;
25
26 // Client A joins the channel buffer
27 let channel_buffer_a = client_a
28 .channel_store()
29 .update(cx_a, |channel, cx| channel.open_channel_buffer(zed_id, cx))
30 .await
31 .unwrap();
32
33 // Client A edits the buffer
34 let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
35
36 buffer_a.update(cx_a, |buffer, cx| {
37 buffer.edit([(0..0, "hello world")], None, cx)
38 });
39 buffer_a.update(cx_a, |buffer, cx| {
40 buffer.edit([(5..5, ", cruel")], None, cx)
41 });
42 buffer_a.update(cx_a, |buffer, cx| {
43 buffer.edit([(0..5, "goodbye")], None, cx)
44 });
45 buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
46 deterministic.run_until_parked();
47
48 assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
49
50 // Client B joins the channel buffer
51 let channel_buffer_b = client_b
52 .channel_store()
53 .update(cx_b, |channel, cx| channel.open_channel_buffer(zed_id, cx))
54 .await
55 .unwrap();
56
57 channel_buffer_b.read_with(cx_b, |buffer, _| {
58 assert_collaborators(
59 buffer.collaborators(),
60 &[client_a.user_id(), client_b.user_id()],
61 );
62 });
63
64 // Client B sees the correct text, and then edits it
65 let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
66 assert_eq!(
67 buffer_b.read_with(cx_b, |buffer, _| buffer.remote_id()),
68 buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id())
69 );
70 assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
71 buffer_b.update(cx_b, |buffer, cx| {
72 buffer.edit([(7..12, "beautiful")], None, cx)
73 });
74
75 // Both A and B see the new edit
76 deterministic.run_until_parked();
77 assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
78 assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
79
80 // Client A closes the channel buffer.
81 cx_a.update(|_| drop(channel_buffer_a));
82 deterministic.run_until_parked();
83
84 // Client B sees that client A is gone from the channel buffer.
85 channel_buffer_b.read_with(cx_b, |buffer, _| {
86 assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
87 });
88
89 // Client A rejoins the channel buffer
90 let _channel_buffer_a = client_a
91 .channel_store()
92 .update(cx_a, |channels, cx| {
93 channels.open_channel_buffer(zed_id, cx)
94 })
95 .await
96 .unwrap();
97 deterministic.run_until_parked();
98
99 // Sanity test, make sure we saw A rejoining
100 channel_buffer_b.read_with(cx_b, |buffer, _| {
101 assert_collaborators(
102 &buffer.collaborators(),
103 &[client_b.user_id(), client_a.user_id()],
104 );
105 });
106
107 // Client A loses connection.
108 server.forbid_connections();
109 server.disconnect_client(client_a.peer_id().unwrap());
110 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
111
112 // Client B observes A disconnect
113 channel_buffer_b.read_with(cx_b, |buffer, _| {
114 assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
115 });
116
117 // TODO:
118 // - Test synchronizing offline updates, what happens to A's channel buffer when A disconnects
119 // - Test interaction with channel deletion while buffer is open
120}
121
122#[gpui::test]
123async fn test_channel_buffer_replica_ids(
124 deterministic: Arc<Deterministic>,
125 cx_a: &mut TestAppContext,
126 cx_b: &mut TestAppContext,
127 cx_c: &mut TestAppContext,
128) {
129 deterministic.forbid_parking();
130 let mut server = TestServer::start(&deterministic).await;
131 let client_a = server.create_client(cx_a, "user_a").await;
132 let client_b = server.create_client(cx_b, "user_b").await;
133 let client_c = server.create_client(cx_c, "user_c").await;
134
135 let channel_id = server
136 .make_channel(
137 "zed",
138 (&client_a, cx_a),
139 &mut [(&client_b, cx_b), (&client_c, cx_c)],
140 )
141 .await;
142
143 let active_call_a = cx_a.read(ActiveCall::global);
144 let active_call_b = cx_b.read(ActiveCall::global);
145 let active_call_c = cx_c.read(ActiveCall::global);
146
147 // Clients A and B join a channel.
148 active_call_a
149 .update(cx_a, |call, cx| call.join_channel(channel_id, cx))
150 .await
151 .unwrap();
152 active_call_b
153 .update(cx_b, |call, cx| call.join_channel(channel_id, cx))
154 .await
155 .unwrap();
156
157 // Clients A, B, and C join a channel buffer
158 // C first so that the replica IDs in the project and the channel buffer are different
159 let channel_buffer_c = client_c
160 .channel_store()
161 .update(cx_c, |channel, cx| {
162 channel.open_channel_buffer(channel_id, cx)
163 })
164 .await
165 .unwrap();
166 let channel_buffer_b = client_b
167 .channel_store()
168 .update(cx_b, |channel, cx| {
169 channel.open_channel_buffer(channel_id, cx)
170 })
171 .await
172 .unwrap();
173 let channel_buffer_a = client_a
174 .channel_store()
175 .update(cx_a, |channel, cx| {
176 channel.open_channel_buffer(channel_id, cx)
177 })
178 .await
179 .unwrap();
180
181 // Client B shares a project
182 client_b
183 .fs()
184 .insert_tree("/dir", json!({ "file.txt": "contents" }))
185 .await;
186 let (project_b, _) = client_b.build_local_project("/dir", cx_b).await;
187 let shared_project_id = active_call_b
188 .update(cx_b, |call, cx| call.share_project(project_b.clone(), cx))
189 .await
190 .unwrap();
191
192 // Client A joins the project
193 let project_a = client_a.build_remote_project(shared_project_id, cx_a).await;
194 deterministic.run_until_parked();
195
196 // Client C is in a separate project.
197 client_c.fs().insert_tree("/dir", json!({})).await;
198 let (separate_project_c, _) = client_c.build_local_project("/dir", cx_c).await;
199
200 // Note that each user has a different replica id in the projects vs the
201 // channel buffer.
202 channel_buffer_a.read_with(cx_a, |channel_buffer, cx| {
203 assert_eq!(project_a.read(cx).replica_id(), 1);
204 assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 2);
205 });
206 channel_buffer_b.read_with(cx_b, |channel_buffer, cx| {
207 assert_eq!(project_b.read(cx).replica_id(), 0);
208 assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 1);
209 });
210 channel_buffer_c.read_with(cx_c, |channel_buffer, cx| {
211 // C is not in the project
212 assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 0);
213 });
214
215 let channel_window_a = cx_a
216 .add_window(|cx| ChannelView::new(project_a.clone(), channel_buffer_a.clone(), None, cx));
217 let channel_window_b = cx_b
218 .add_window(|cx| ChannelView::new(project_b.clone(), channel_buffer_b.clone(), None, cx));
219 let channel_window_c = cx_c.add_window(|cx| {
220 ChannelView::new(
221 separate_project_c.clone(),
222 channel_buffer_c.clone(),
223 None,
224 cx,
225 )
226 });
227
228 let channel_view_a = channel_window_a.root(cx_a);
229 let channel_view_b = channel_window_b.root(cx_b);
230 let channel_view_c = channel_window_c.root(cx_c);
231
232 // For clients A and B, the replica ids in the channel buffer are mapped
233 // so that they match the same users' replica ids in their shared project.
234 channel_view_a.read_with(cx_a, |view, cx| {
235 assert_eq!(
236 view.editor.read(cx).replica_id_map().unwrap(),
237 &[(1, 0), (2, 1)].into_iter().collect::<HashMap<_, _>>()
238 );
239 });
240 channel_view_b.read_with(cx_b, |view, cx| {
241 assert_eq!(
242 view.editor.read(cx).replica_id_map().unwrap(),
243 &[(1, 0), (2, 1)].into_iter().collect::<HashMap<u16, u16>>(),
244 )
245 });
246
247 // Client C only sees themself, as they're not part of any shared project
248 channel_view_c.read_with(cx_c, |view, cx| {
249 assert_eq!(
250 view.editor.read(cx).replica_id_map().unwrap(),
251 &[(0, 0)].into_iter().collect::<HashMap<u16, u16>>(),
252 );
253 });
254
255 // Client C joins the project that clients A and B are in.
256 active_call_c
257 .update(cx_c, |call, cx| call.join_channel(channel_id, cx))
258 .await
259 .unwrap();
260 let project_c = client_c.build_remote_project(shared_project_id, cx_c).await;
261 deterministic.run_until_parked();
262 project_c.read_with(cx_c, |project, _| {
263 assert_eq!(project.replica_id(), 2);
264 });
265
266 // For clients A and B, client C's replica id in the channel buffer is
267 // now mapped to their replica id in the shared project.
268 channel_view_a.read_with(cx_a, |view, cx| {
269 assert_eq!(
270 view.editor.read(cx).replica_id_map().unwrap(),
271 &[(1, 0), (2, 1), (0, 2)]
272 .into_iter()
273 .collect::<HashMap<_, _>>()
274 );
275 });
276 channel_view_b.read_with(cx_b, |view, cx| {
277 assert_eq!(
278 view.editor.read(cx).replica_id_map().unwrap(),
279 &[(1, 0), (2, 1), (0, 2)]
280 .into_iter()
281 .collect::<HashMap<_, _>>(),
282 )
283 });
284}
285
286#[track_caller]
287fn assert_collaborators(collaborators: &[proto::Collaborator], ids: &[Option<UserId>]) {
288 assert_eq!(
289 collaborators
290 .into_iter()
291 .map(|collaborator| collaborator.user_id)
292 .collect::<Vec<_>>(),
293 ids.into_iter().map(|id| id.unwrap()).collect::<Vec<_>>()
294 );
295}
296
297fn buffer_text(channel_buffer: &ModelHandle<language::Buffer>, cx: &mut TestAppContext) -> String {
298 channel_buffer.read_with(cx, |buffer, _| buffer.text())
299}