1use super::*;
2use crate::test_both_dbs;
3use language::proto::{self, serialize_version};
4use text::Buffer;
5
6test_both_dbs!(
7 test_channel_buffers,
8 test_channel_buffers_postgres,
9 test_channel_buffers_sqlite
10);
11
12async fn test_channel_buffers(db: &Arc<Database>) {
13 let a_id = db
14 .create_user(
15 "user_a@example.com",
16 false,
17 NewUserParams {
18 github_login: "user_a".into(),
19 github_user_id: 101,
20 },
21 )
22 .await
23 .unwrap()
24 .user_id;
25 let b_id = db
26 .create_user(
27 "user_b@example.com",
28 false,
29 NewUserParams {
30 github_login: "user_b".into(),
31 github_user_id: 102,
32 },
33 )
34 .await
35 .unwrap()
36 .user_id;
37
38 // This user will not be a part of the channel
39 let c_id = db
40 .create_user(
41 "user_c@example.com",
42 false,
43 NewUserParams {
44 github_login: "user_c".into(),
45 github_user_id: 102,
46 },
47 )
48 .await
49 .unwrap()
50 .user_id;
51
52 let owner_id = db.create_server("production").await.unwrap().0 as u32;
53
54 let zed_id = db.create_root_channel("zed", a_id).await.unwrap();
55
56 db.invite_channel_member(zed_id, b_id, a_id, ChannelRole::Member)
57 .await
58 .unwrap();
59
60 db.respond_to_channel_invite(zed_id, b_id, true)
61 .await
62 .unwrap();
63
64 let connection_id_a = ConnectionId { owner_id, id: 1 };
65 let _ = db
66 .join_channel_buffer(zed_id, a_id, connection_id_a)
67 .await
68 .unwrap();
69
70 let mut buffer_a = Buffer::new(0, text::BufferId::new(1).unwrap(), "".to_string());
71 let operations = vec![
72 buffer_a.edit([(0..0, "hello world")]),
73 buffer_a.edit([(5..5, ", cruel")]),
74 buffer_a.edit([(0..5, "goodbye")]),
75 buffer_a.undo().unwrap().1,
76 ];
77 assert_eq!(buffer_a.text(), "hello, cruel world");
78
79 let operations = operations
80 .into_iter()
81 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
82 .collect::<Vec<_>>();
83
84 db.update_channel_buffer(zed_id, a_id, &operations)
85 .await
86 .unwrap();
87
88 let connection_id_b = ConnectionId { owner_id, id: 2 };
89 let buffer_response_b = db
90 .join_channel_buffer(zed_id, b_id, connection_id_b)
91 .await
92 .unwrap();
93
94 let mut buffer_b = Buffer::new(
95 0,
96 text::BufferId::new(1).unwrap(),
97 buffer_response_b.base_text,
98 );
99 buffer_b
100 .apply_ops(buffer_response_b.operations.into_iter().map(|operation| {
101 let operation = proto::deserialize_operation(operation).unwrap();
102 if let language::Operation::Buffer(operation) = operation {
103 operation
104 } else {
105 unreachable!()
106 }
107 }))
108 .unwrap();
109
110 assert_eq!(buffer_b.text(), "hello, cruel world");
111
112 // Ensure that C fails to open the buffer
113 assert!(db
114 .join_channel_buffer(zed_id, c_id, ConnectionId { owner_id, id: 3 })
115 .await
116 .is_err());
117
118 // Ensure that both collaborators have shown up
119 assert_eq!(
120 buffer_response_b.collaborators,
121 &[
122 rpc::proto::Collaborator {
123 user_id: a_id.to_proto(),
124 peer_id: Some(rpc::proto::PeerId { id: 1, owner_id }),
125 replica_id: 0,
126 },
127 rpc::proto::Collaborator {
128 user_id: b_id.to_proto(),
129 peer_id: Some(rpc::proto::PeerId { id: 2, owner_id }),
130 replica_id: 1,
131 }
132 ]
133 );
134
135 // Ensure that get_channel_buffer_collaborators works
136 let zed_collaborats = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
137 assert_eq!(zed_collaborats, &[a_id, b_id]);
138
139 let left_buffer = db
140 .leave_channel_buffer(zed_id, connection_id_b)
141 .await
142 .unwrap();
143
144 assert_eq!(left_buffer.connections, &[connection_id_a],);
145
146 let cargo_id = db.create_root_channel("cargo", a_id).await.unwrap();
147 let _ = db
148 .join_channel_buffer(cargo_id, a_id, connection_id_a)
149 .await
150 .unwrap();
151
152 db.leave_channel_buffers(connection_id_a).await.unwrap();
153
154 let zed_collaborators = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
155 let cargo_collaborators = db.get_channel_buffer_collaborators(cargo_id).await.unwrap();
156 assert_eq!(zed_collaborators, &[]);
157 assert_eq!(cargo_collaborators, &[]);
158
159 // When everyone has left the channel, the operations are collapsed into
160 // a new base text.
161 let buffer_response_b = db
162 .join_channel_buffer(zed_id, b_id, connection_id_b)
163 .await
164 .unwrap();
165 assert_eq!(buffer_response_b.base_text, "hello, cruel world");
166 assert_eq!(buffer_response_b.operations, &[]);
167}
168
169test_both_dbs!(
170 test_channel_buffers_last_operations,
171 test_channel_buffers_last_operations_postgres,
172 test_channel_buffers_last_operations_sqlite
173);
174
175async fn test_channel_buffers_last_operations(db: &Database) {
176 let user_id = db
177 .create_user(
178 "user_a@example.com",
179 false,
180 NewUserParams {
181 github_login: "user_a".into(),
182 github_user_id: 101,
183 },
184 )
185 .await
186 .unwrap()
187 .user_id;
188 let observer_id = db
189 .create_user(
190 "user_b@example.com",
191 false,
192 NewUserParams {
193 github_login: "user_b".into(),
194 github_user_id: 102,
195 },
196 )
197 .await
198 .unwrap()
199 .user_id;
200 let owner_id = db.create_server("production").await.unwrap().0 as u32;
201 let connection_id = ConnectionId {
202 owner_id,
203 id: user_id.0 as u32,
204 };
205
206 let mut buffers = Vec::new();
207 let mut text_buffers = Vec::new();
208 for i in 0..3 {
209 let channel = db
210 .create_root_channel(&format!("channel-{i}"), user_id)
211 .await
212 .unwrap();
213
214 db.invite_channel_member(channel, observer_id, user_id, ChannelRole::Member)
215 .await
216 .unwrap();
217 db.respond_to_channel_invite(channel, observer_id, true)
218 .await
219 .unwrap();
220
221 db.join_channel_buffer(channel, user_id, connection_id)
222 .await
223 .unwrap();
224
225 buffers.push(
226 db.transaction(|tx| async move { db.get_channel_buffer(channel, &tx).await })
227 .await
228 .unwrap(),
229 );
230
231 text_buffers.push(Buffer::new(
232 0,
233 text::BufferId::new(1).unwrap(),
234 "".to_string(),
235 ));
236 }
237
238 let operations = db
239 .transaction(|tx| {
240 let buffers = &buffers;
241 async move {
242 db.get_latest_operations_for_buffers([buffers[0].id, buffers[2].id], &tx)
243 .await
244 }
245 })
246 .await
247 .unwrap();
248
249 assert!(operations.is_empty());
250
251 update_buffer(
252 buffers[0].channel_id,
253 user_id,
254 db,
255 vec![
256 text_buffers[0].edit([(0..0, "a")]),
257 text_buffers[0].edit([(0..0, "b")]),
258 text_buffers[0].edit([(0..0, "c")]),
259 ],
260 )
261 .await;
262
263 update_buffer(
264 buffers[1].channel_id,
265 user_id,
266 db,
267 vec![
268 text_buffers[1].edit([(0..0, "d")]),
269 text_buffers[1].edit([(1..1, "e")]),
270 text_buffers[1].edit([(2..2, "f")]),
271 ],
272 )
273 .await;
274
275 // cause buffer 1's epoch to increment.
276 db.leave_channel_buffer(buffers[1].channel_id, connection_id)
277 .await
278 .unwrap();
279 db.join_channel_buffer(buffers[1].channel_id, user_id, connection_id)
280 .await
281 .unwrap();
282 text_buffers[1] = Buffer::new(1, text::BufferId::new(1).unwrap(), "def".to_string());
283 update_buffer(
284 buffers[1].channel_id,
285 user_id,
286 db,
287 vec![
288 text_buffers[1].edit([(0..0, "g")]),
289 text_buffers[1].edit([(0..0, "h")]),
290 ],
291 )
292 .await;
293
294 update_buffer(
295 buffers[2].channel_id,
296 user_id,
297 db,
298 vec![text_buffers[2].edit([(0..0, "i")])],
299 )
300 .await;
301
302 let operations = db
303 .transaction(|tx| {
304 let buffers = &buffers;
305 async move {
306 db.get_latest_operations_for_buffers([buffers[1].id, buffers[2].id], &tx)
307 .await
308 }
309 })
310 .await
311 .unwrap();
312 assert_operations(
313 &operations,
314 &[
315 (buffers[1].id, 1, &text_buffers[1]),
316 (buffers[2].id, 0, &text_buffers[2]),
317 ],
318 );
319
320 let operations = db
321 .transaction(|tx| {
322 let buffers = &buffers;
323 async move {
324 db.get_latest_operations_for_buffers([buffers[0].id, buffers[1].id], &tx)
325 .await
326 }
327 })
328 .await
329 .unwrap();
330 assert_operations(
331 &operations,
332 &[
333 (buffers[0].id, 0, &text_buffers[0]),
334 (buffers[1].id, 1, &text_buffers[1]),
335 ],
336 );
337
338 let buffer_changes = db
339 .transaction(|tx| {
340 let buffers = &buffers;
341 let mut hash = HashMap::default();
342 hash.insert(buffers[0].id, buffers[0].channel_id);
343 hash.insert(buffers[1].id, buffers[1].channel_id);
344 hash.insert(buffers[2].id, buffers[2].channel_id);
345
346 async move { db.latest_channel_buffer_changes(&hash, &tx).await }
347 })
348 .await
349 .unwrap();
350
351 pretty_assertions::assert_eq!(
352 buffer_changes,
353 [
354 rpc::proto::ChannelBufferVersion {
355 channel_id: buffers[0].channel_id.to_proto(),
356 epoch: 0,
357 version: serialize_version(&text_buffers[0].version()),
358 },
359 rpc::proto::ChannelBufferVersion {
360 channel_id: buffers[1].channel_id.to_proto(),
361 epoch: 1,
362 version: serialize_version(&text_buffers[1].version())
363 .into_iter()
364 .filter(|vector| vector.replica_id
365 == buffer_changes[1].version.first().unwrap().replica_id)
366 .collect::<Vec<_>>(),
367 },
368 rpc::proto::ChannelBufferVersion {
369 channel_id: buffers[2].channel_id.to_proto(),
370 epoch: 0,
371 version: serialize_version(&text_buffers[2].version()),
372 },
373 ]
374 );
375}
376
377async fn update_buffer(
378 channel_id: ChannelId,
379 user_id: UserId,
380 db: &Database,
381 operations: Vec<text::Operation>,
382) {
383 let operations = operations
384 .into_iter()
385 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
386 .collect::<Vec<_>>();
387 db.update_channel_buffer(channel_id, user_id, &operations)
388 .await
389 .unwrap();
390}
391
392fn assert_operations(
393 operations: &[buffer_operation::Model],
394 expected: &[(BufferId, i32, &text::Buffer)],
395) {
396 let actual = operations
397 .iter()
398 .map(|op| buffer_operation::Model {
399 buffer_id: op.buffer_id,
400 epoch: op.epoch,
401 lamport_timestamp: op.lamport_timestamp,
402 replica_id: op.replica_id,
403 value: vec![],
404 })
405 .collect::<Vec<_>>();
406 let expected = expected
407 .iter()
408 .map(|(buffer_id, epoch, buffer)| buffer_operation::Model {
409 buffer_id: *buffer_id,
410 epoch: *epoch,
411 lamport_timestamp: buffer.lamport_clock.value as i32 - 1,
412 replica_id: buffer.replica_id() as i32,
413 value: vec![],
414 })
415 .collect::<Vec<_>>();
416 assert_eq!(actual, expected, "unexpected operations")
417}