1use super::*;
2use crate::test_both_dbs;
3use language::proto::{self, serialize_version};
4use text::{Buffer, ReplicaId};
5
6test_both_dbs!(
7 test_channel_buffers,
8 test_channel_buffers_postgres,
9 test_channel_buffers_sqlite
10);
11
12async fn test_channel_buffers(db: &Arc<Database>) {
13 let a_id = db
14 .create_user(
15 "user_a@example.com",
16 None,
17 false,
18 NewUserParams {
19 github_login: "user_a".into(),
20 github_user_id: 101,
21 },
22 )
23 .await
24 .unwrap()
25 .user_id;
26 let b_id = db
27 .create_user(
28 "user_b@example.com",
29 None,
30 false,
31 NewUserParams {
32 github_login: "user_b".into(),
33 github_user_id: 102,
34 },
35 )
36 .await
37 .unwrap()
38 .user_id;
39
40 // This user will not be a part of the channel
41 let c_id = db
42 .create_user(
43 "user_c@example.com",
44 None,
45 false,
46 NewUserParams {
47 github_login: "user_c".into(),
48 github_user_id: 103,
49 },
50 )
51 .await
52 .unwrap()
53 .user_id;
54
55 let owner_id = db.create_server("production").await.unwrap().0 as u32;
56
57 let zed_id = db.create_root_channel("zed", a_id).await.unwrap();
58
59 db.invite_channel_member(zed_id, b_id, a_id, ChannelRole::Member)
60 .await
61 .unwrap();
62
63 db.respond_to_channel_invite(zed_id, b_id, true)
64 .await
65 .unwrap();
66
67 let connection_id_a = ConnectionId { owner_id, id: 1 };
68 let _ = db
69 .join_channel_buffer(zed_id, a_id, connection_id_a)
70 .await
71 .unwrap();
72
73 let mut buffer_a = Buffer::new(
74 ReplicaId::new(0),
75 text::BufferId::new(1).unwrap(),
76 "".to_string(),
77 &db.test_options.as_ref().unwrap().executor,
78 );
79 let operations = vec![
80 buffer_a.edit(
81 [(0..0, "hello world")],
82 &db.test_options.as_ref().unwrap().executor,
83 ),
84 buffer_a.edit(
85 [(5..5, ", cruel")],
86 &db.test_options.as_ref().unwrap().executor,
87 ),
88 buffer_a.edit(
89 [(0..5, "goodbye")],
90 &db.test_options.as_ref().unwrap().executor,
91 ),
92 buffer_a.undo().unwrap().1,
93 ];
94 assert_eq!(buffer_a.text(), "hello, cruel world");
95
96 let operations = operations
97 .into_iter()
98 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
99 .collect::<Vec<_>>();
100
101 db.update_channel_buffer(zed_id, a_id, &operations)
102 .await
103 .unwrap();
104
105 let connection_id_b = ConnectionId { owner_id, id: 2 };
106 let buffer_response_b = db
107 .join_channel_buffer(zed_id, b_id, connection_id_b)
108 .await
109 .unwrap();
110
111 let mut buffer_b = Buffer::new(
112 ReplicaId::new(0),
113 text::BufferId::new(1).unwrap(),
114 buffer_response_b.base_text,
115 &db.test_options.as_ref().unwrap().executor,
116 );
117 buffer_b.apply_ops(
118 buffer_response_b.operations.into_iter().map(|operation| {
119 let operation = proto::deserialize_operation(operation).unwrap();
120 if let language::Operation::Buffer(operation) = operation {
121 operation
122 } else {
123 unreachable!()
124 }
125 }),
126 None,
127 );
128
129 assert_eq!(buffer_b.text(), "hello, cruel world");
130
131 // Ensure that C fails to open the buffer
132 assert!(
133 db.join_channel_buffer(zed_id, c_id, ConnectionId { owner_id, id: 3 })
134 .await
135 .is_err()
136 );
137
138 // Ensure that both collaborators have shown up
139 assert_eq!(
140 buffer_response_b.collaborators,
141 &[
142 rpc::proto::Collaborator {
143 user_id: a_id.to_proto(),
144 peer_id: Some(rpc::proto::PeerId { id: 1, owner_id }),
145 replica_id: ReplicaId::FIRST_COLLAB_ID.as_u16() as u32,
146 is_host: false,
147 committer_name: None,
148 committer_email: None,
149 },
150 rpc::proto::Collaborator {
151 user_id: b_id.to_proto(),
152 peer_id: Some(rpc::proto::PeerId { id: 2, owner_id }),
153 replica_id: ReplicaId::FIRST_COLLAB_ID.as_u16() as u32 + 1,
154 is_host: false,
155 committer_name: None,
156 committer_email: None,
157 }
158 ]
159 );
160
161 // Ensure that get_channel_buffer_collaborators works
162 let zed_collaborats = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
163 assert_eq!(zed_collaborats, &[a_id, b_id]);
164
165 let left_buffer = db
166 .leave_channel_buffer(zed_id, connection_id_b)
167 .await
168 .unwrap();
169
170 assert_eq!(left_buffer.connections, &[connection_id_a],);
171
172 let cargo_id = db.create_root_channel("cargo", a_id).await.unwrap();
173 let _ = db
174 .join_channel_buffer(cargo_id, a_id, connection_id_a)
175 .await
176 .unwrap();
177
178 db.leave_channel_buffers(connection_id_a).await.unwrap();
179
180 let zed_collaborators = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
181 let cargo_collaborators = db.get_channel_buffer_collaborators(cargo_id).await.unwrap();
182 assert_eq!(zed_collaborators, &[]);
183 assert_eq!(cargo_collaborators, &[]);
184
185 // When everyone has left the channel, the operations are collapsed into
186 // a new base text.
187 let buffer_response_b = db
188 .join_channel_buffer(zed_id, b_id, connection_id_b)
189 .await
190 .unwrap();
191 assert_eq!(buffer_response_b.base_text, "hello, cruel world");
192 assert_eq!(buffer_response_b.operations, &[]);
193}
194
195test_both_dbs!(
196 test_channel_buffers_last_operations,
197 test_channel_buffers_last_operations_postgres,
198 test_channel_buffers_last_operations_sqlite
199);
200
201async fn test_channel_buffers_last_operations(db: &Database) {
202 let user_id = db
203 .create_user(
204 "user_a@example.com",
205 None,
206 false,
207 NewUserParams {
208 github_login: "user_a".into(),
209 github_user_id: 101,
210 },
211 )
212 .await
213 .unwrap()
214 .user_id;
215 let observer_id = db
216 .create_user(
217 "user_b@example.com",
218 None,
219 false,
220 NewUserParams {
221 github_login: "user_b".into(),
222 github_user_id: 102,
223 },
224 )
225 .await
226 .unwrap()
227 .user_id;
228 let owner_id = db.create_server("production").await.unwrap().0 as u32;
229 let connection_id = ConnectionId {
230 owner_id,
231 id: user_id.0 as u32,
232 };
233
234 let mut buffers = Vec::new();
235 let mut text_buffers = Vec::new();
236 for i in 0..3 {
237 let channel = db
238 .create_root_channel(&format!("channel-{i}"), user_id)
239 .await
240 .unwrap();
241
242 db.invite_channel_member(channel, observer_id, user_id, ChannelRole::Member)
243 .await
244 .unwrap();
245 db.respond_to_channel_invite(channel, observer_id, true)
246 .await
247 .unwrap();
248
249 let res = db
250 .join_channel_buffer(channel, user_id, connection_id)
251 .await
252 .unwrap();
253
254 buffers.push(
255 db.transaction(|tx| async move { db.get_channel_buffer(channel, &tx).await })
256 .await
257 .unwrap(),
258 );
259
260 text_buffers.push(Buffer::new(
261 ReplicaId::new(res.replica_id as u16),
262 text::BufferId::new(1).unwrap(),
263 "".to_string(),
264 &db.test_options.as_ref().unwrap().executor,
265 ));
266 }
267
268 update_buffer(
269 buffers[0].channel_id,
270 user_id,
271 db,
272 vec![
273 text_buffers[0].edit([(0..0, "a")], &db.test_options.as_ref().unwrap().executor),
274 text_buffers[0].edit([(0..0, "b")], &db.test_options.as_ref().unwrap().executor),
275 text_buffers[0].edit([(0..0, "c")], &db.test_options.as_ref().unwrap().executor),
276 ],
277 )
278 .await;
279
280 update_buffer(
281 buffers[1].channel_id,
282 user_id,
283 db,
284 vec![
285 text_buffers[1].edit([(0..0, "d")], &db.test_options.as_ref().unwrap().executor),
286 text_buffers[1].edit([(1..1, "e")], &db.test_options.as_ref().unwrap().executor),
287 text_buffers[1].edit([(2..2, "f")], &db.test_options.as_ref().unwrap().executor),
288 ],
289 )
290 .await;
291
292 // cause buffer 1's epoch to increment.
293 db.leave_channel_buffer(buffers[1].channel_id, connection_id)
294 .await
295 .unwrap();
296 db.join_channel_buffer(buffers[1].channel_id, user_id, connection_id)
297 .await
298 .unwrap();
299 let replica_id = text_buffers[1].replica_id();
300 text_buffers[1] = Buffer::new(
301 replica_id,
302 text::BufferId::new(1).unwrap(),
303 "def".to_string(),
304 &db.test_options.as_ref().unwrap().executor,
305 );
306 update_buffer(
307 buffers[1].channel_id,
308 user_id,
309 db,
310 vec![
311 text_buffers[1].edit([(0..0, "g")], &db.test_options.as_ref().unwrap().executor),
312 text_buffers[1].edit([(0..0, "h")], &db.test_options.as_ref().unwrap().executor),
313 ],
314 )
315 .await;
316
317 update_buffer(
318 buffers[2].channel_id,
319 user_id,
320 db,
321 vec![text_buffers[2].edit([(0..0, "i")], &db.test_options.as_ref().unwrap().executor)],
322 )
323 .await;
324
325 let channels_for_user = db.get_channels_for_user(user_id).await.unwrap();
326
327 pretty_assertions::assert_eq!(
328 channels_for_user.latest_buffer_versions,
329 [
330 rpc::proto::ChannelBufferVersion {
331 channel_id: buffers[0].channel_id.to_proto(),
332 epoch: 0,
333 version: serialize_version(&text_buffers[0].version())
334 .into_iter()
335 .filter(
336 |vector| vector.replica_id == text_buffers[0].replica_id().as_u16() as u32
337 )
338 .collect::<Vec<_>>(),
339 },
340 rpc::proto::ChannelBufferVersion {
341 channel_id: buffers[1].channel_id.to_proto(),
342 epoch: 1,
343 version: serialize_version(&text_buffers[1].version())
344 .into_iter()
345 .filter(
346 |vector| vector.replica_id == text_buffers[1].replica_id().as_u16() as u32
347 )
348 .collect::<Vec<_>>(),
349 },
350 rpc::proto::ChannelBufferVersion {
351 channel_id: buffers[2].channel_id.to_proto(),
352 epoch: 0,
353 version: serialize_version(&text_buffers[2].version())
354 .into_iter()
355 .filter(
356 |vector| vector.replica_id == text_buffers[2].replica_id().as_u16() as u32
357 )
358 .collect::<Vec<_>>(),
359 },
360 ]
361 );
362}
363
364async fn update_buffer(
365 channel_id: ChannelId,
366 user_id: UserId,
367 db: &Database,
368 operations: Vec<text::Operation>,
369) {
370 let operations = operations
371 .into_iter()
372 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
373 .collect::<Vec<_>>();
374 db.update_channel_buffer(channel_id, user_id, &operations)
375 .await
376 .unwrap();
377}