1use super::*;
2use crate::test_both_dbs;
3use language::proto::{self, serialize_version};
4use text::Buffer;
5
6test_both_dbs!(
7 test_channel_buffers,
8 test_channel_buffers_postgres,
9 test_channel_buffers_sqlite
10);
11
12async fn test_channel_buffers(db: &Arc<Database>) {
13 let a_id = db
14 .create_user(
15 "user_a@example.com",
16 false,
17 NewUserParams {
18 github_login: "user_a".into(),
19 github_user_id: 101,
20 invite_count: 0,
21 },
22 )
23 .await
24 .unwrap()
25 .user_id;
26 let b_id = db
27 .create_user(
28 "user_b@example.com",
29 false,
30 NewUserParams {
31 github_login: "user_b".into(),
32 github_user_id: 102,
33 invite_count: 0,
34 },
35 )
36 .await
37 .unwrap()
38 .user_id;
39
40 // This user will not be a part of the channel
41 let c_id = db
42 .create_user(
43 "user_c@example.com",
44 false,
45 NewUserParams {
46 github_login: "user_c".into(),
47 github_user_id: 102,
48 invite_count: 0,
49 },
50 )
51 .await
52 .unwrap()
53 .user_id;
54
55 let owner_id = db.create_server("production").await.unwrap().0 as u32;
56
57 let zed_id = db.create_root_channel("zed", a_id).await.unwrap();
58
59 db.invite_channel_member(zed_id, b_id, a_id, ChannelRole::Member)
60 .await
61 .unwrap();
62
63 db.respond_to_channel_invite(zed_id, b_id, true)
64 .await
65 .unwrap();
66
67 let connection_id_a = ConnectionId { owner_id, id: 1 };
68 let _ = db
69 .join_channel_buffer(zed_id, a_id, connection_id_a)
70 .await
71 .unwrap();
72
73 let mut buffer_a = Buffer::new(0, 0, "".to_string());
74 let mut operations = Vec::new();
75 operations.push(buffer_a.edit([(0..0, "hello world")]));
76 operations.push(buffer_a.edit([(5..5, ", cruel")]));
77 operations.push(buffer_a.edit([(0..5, "goodbye")]));
78 operations.push(buffer_a.undo().unwrap().1);
79 assert_eq!(buffer_a.text(), "hello, cruel world");
80
81 let operations = operations
82 .into_iter()
83 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
84 .collect::<Vec<_>>();
85
86 db.update_channel_buffer(zed_id, a_id, &operations)
87 .await
88 .unwrap();
89
90 let connection_id_b = ConnectionId { owner_id, id: 2 };
91 let buffer_response_b = db
92 .join_channel_buffer(zed_id, b_id, connection_id_b)
93 .await
94 .unwrap();
95
96 let mut buffer_b = Buffer::new(0, 0, buffer_response_b.base_text);
97 buffer_b
98 .apply_ops(buffer_response_b.operations.into_iter().map(|operation| {
99 let operation = proto::deserialize_operation(operation).unwrap();
100 if let language::Operation::Buffer(operation) = operation {
101 operation
102 } else {
103 unreachable!()
104 }
105 }))
106 .unwrap();
107
108 assert_eq!(buffer_b.text(), "hello, cruel world");
109
110 // Ensure that C fails to open the buffer
111 assert!(db
112 .join_channel_buffer(zed_id, c_id, ConnectionId { owner_id, id: 3 })
113 .await
114 .is_err());
115
116 // Ensure that both collaborators have shown up
117 assert_eq!(
118 buffer_response_b.collaborators,
119 &[
120 rpc::proto::Collaborator {
121 user_id: a_id.to_proto(),
122 peer_id: Some(rpc::proto::PeerId { id: 1, owner_id }),
123 replica_id: 0,
124 },
125 rpc::proto::Collaborator {
126 user_id: b_id.to_proto(),
127 peer_id: Some(rpc::proto::PeerId { id: 2, owner_id }),
128 replica_id: 1,
129 }
130 ]
131 );
132
133 // Ensure that get_channel_buffer_collaborators works
134 let zed_collaborats = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
135 assert_eq!(zed_collaborats, &[a_id, b_id]);
136
137 let left_buffer = db
138 .leave_channel_buffer(zed_id, connection_id_b)
139 .await
140 .unwrap();
141
142 assert_eq!(left_buffer.connections, &[connection_id_a],);
143
144 let cargo_id = db.create_root_channel("cargo", a_id).await.unwrap();
145 let _ = db
146 .join_channel_buffer(cargo_id, a_id, connection_id_a)
147 .await
148 .unwrap();
149
150 db.leave_channel_buffers(connection_id_a).await.unwrap();
151
152 let zed_collaborators = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
153 let cargo_collaborators = db.get_channel_buffer_collaborators(cargo_id).await.unwrap();
154 assert_eq!(zed_collaborators, &[]);
155 assert_eq!(cargo_collaborators, &[]);
156
157 // When everyone has left the channel, the operations are collapsed into
158 // a new base text.
159 let buffer_response_b = db
160 .join_channel_buffer(zed_id, b_id, connection_id_b)
161 .await
162 .unwrap();
163 assert_eq!(buffer_response_b.base_text, "hello, cruel world");
164 assert_eq!(buffer_response_b.operations, &[]);
165}
166
167test_both_dbs!(
168 test_channel_buffers_last_operations,
169 test_channel_buffers_last_operations_postgres,
170 test_channel_buffers_last_operations_sqlite
171);
172
173async fn test_channel_buffers_last_operations(db: &Database) {
174 let user_id = db
175 .create_user(
176 "user_a@example.com",
177 false,
178 NewUserParams {
179 github_login: "user_a".into(),
180 github_user_id: 101,
181 invite_count: 0,
182 },
183 )
184 .await
185 .unwrap()
186 .user_id;
187 let observer_id = db
188 .create_user(
189 "user_b@example.com",
190 false,
191 NewUserParams {
192 github_login: "user_b".into(),
193 github_user_id: 102,
194 invite_count: 0,
195 },
196 )
197 .await
198 .unwrap()
199 .user_id;
200 let owner_id = db.create_server("production").await.unwrap().0 as u32;
201 let connection_id = ConnectionId {
202 owner_id,
203 id: user_id.0 as u32,
204 };
205
206 let mut buffers = Vec::new();
207 let mut text_buffers = Vec::new();
208 for i in 0..3 {
209 let channel = db
210 .create_root_channel(&format!("channel-{i}"), user_id)
211 .await
212 .unwrap();
213
214 db.invite_channel_member(channel, observer_id, user_id, ChannelRole::Member)
215 .await
216 .unwrap();
217 db.respond_to_channel_invite(channel, observer_id, true)
218 .await
219 .unwrap();
220
221 db.join_channel_buffer(channel, user_id, connection_id)
222 .await
223 .unwrap();
224
225 buffers.push(
226 db.transaction(|tx| async move { db.get_channel_buffer(channel, &*tx).await })
227 .await
228 .unwrap(),
229 );
230
231 text_buffers.push(Buffer::new(0, 0, "".to_string()));
232 }
233
234 let operations = db
235 .transaction(|tx| {
236 let buffers = &buffers;
237 async move {
238 db.get_latest_operations_for_buffers([buffers[0].id, buffers[2].id], &*tx)
239 .await
240 }
241 })
242 .await
243 .unwrap();
244
245 assert!(operations.is_empty());
246
247 update_buffer(
248 buffers[0].channel_id,
249 user_id,
250 db,
251 vec![
252 text_buffers[0].edit([(0..0, "a")]),
253 text_buffers[0].edit([(0..0, "b")]),
254 text_buffers[0].edit([(0..0, "c")]),
255 ],
256 )
257 .await;
258
259 update_buffer(
260 buffers[1].channel_id,
261 user_id,
262 db,
263 vec![
264 text_buffers[1].edit([(0..0, "d")]),
265 text_buffers[1].edit([(1..1, "e")]),
266 text_buffers[1].edit([(2..2, "f")]),
267 ],
268 )
269 .await;
270
271 // cause buffer 1's epoch to increment.
272 db.leave_channel_buffer(buffers[1].channel_id, connection_id)
273 .await
274 .unwrap();
275 db.join_channel_buffer(buffers[1].channel_id, user_id, connection_id)
276 .await
277 .unwrap();
278 text_buffers[1] = Buffer::new(1, 0, "def".to_string());
279 update_buffer(
280 buffers[1].channel_id,
281 user_id,
282 db,
283 vec![
284 text_buffers[1].edit([(0..0, "g")]),
285 text_buffers[1].edit([(0..0, "h")]),
286 ],
287 )
288 .await;
289
290 update_buffer(
291 buffers[2].channel_id,
292 user_id,
293 db,
294 vec![text_buffers[2].edit([(0..0, "i")])],
295 )
296 .await;
297
298 let operations = db
299 .transaction(|tx| {
300 let buffers = &buffers;
301 async move {
302 db.get_latest_operations_for_buffers([buffers[1].id, buffers[2].id], &*tx)
303 .await
304 }
305 })
306 .await
307 .unwrap();
308 assert_operations(
309 &operations,
310 &[
311 (buffers[1].id, 1, &text_buffers[1]),
312 (buffers[2].id, 0, &text_buffers[2]),
313 ],
314 );
315
316 let operations = db
317 .transaction(|tx| {
318 let buffers = &buffers;
319 async move {
320 db.get_latest_operations_for_buffers([buffers[0].id, buffers[1].id], &*tx)
321 .await
322 }
323 })
324 .await
325 .unwrap();
326 assert_operations(
327 &operations,
328 &[
329 (buffers[0].id, 0, &text_buffers[0]),
330 (buffers[1].id, 1, &text_buffers[1]),
331 ],
332 );
333
334 let buffer_changes = db
335 .transaction(|tx| {
336 let buffers = &buffers;
337 async move {
338 db.unseen_channel_buffer_changes(
339 observer_id,
340 &[
341 buffers[0].channel_id,
342 buffers[1].channel_id,
343 buffers[2].channel_id,
344 ],
345 &*tx,
346 )
347 .await
348 }
349 })
350 .await
351 .unwrap();
352
353 pretty_assertions::assert_eq!(
354 buffer_changes,
355 [
356 rpc::proto::UnseenChannelBufferChange {
357 channel_id: buffers[0].channel_id.to_proto(),
358 epoch: 0,
359 version: serialize_version(&text_buffers[0].version()),
360 },
361 rpc::proto::UnseenChannelBufferChange {
362 channel_id: buffers[1].channel_id.to_proto(),
363 epoch: 1,
364 version: serialize_version(&text_buffers[1].version())
365 .into_iter()
366 .filter(|vector| vector.replica_id
367 == buffer_changes[1].version.first().unwrap().replica_id)
368 .collect::<Vec<_>>(),
369 },
370 rpc::proto::UnseenChannelBufferChange {
371 channel_id: buffers[2].channel_id.to_proto(),
372 epoch: 0,
373 version: serialize_version(&text_buffers[2].version()),
374 },
375 ]
376 );
377
378 db.observe_buffer_version(
379 buffers[1].id,
380 observer_id,
381 1,
382 serialize_version(&text_buffers[1].version()).as_slice(),
383 )
384 .await
385 .unwrap();
386
387 let buffer_changes = db
388 .transaction(|tx| {
389 let buffers = &buffers;
390 async move {
391 db.unseen_channel_buffer_changes(
392 observer_id,
393 &[
394 buffers[0].channel_id,
395 buffers[1].channel_id,
396 buffers[2].channel_id,
397 ],
398 &*tx,
399 )
400 .await
401 }
402 })
403 .await
404 .unwrap();
405
406 assert_eq!(
407 buffer_changes,
408 [
409 rpc::proto::UnseenChannelBufferChange {
410 channel_id: buffers[0].channel_id.to_proto(),
411 epoch: 0,
412 version: serialize_version(&text_buffers[0].version()),
413 },
414 rpc::proto::UnseenChannelBufferChange {
415 channel_id: buffers[2].channel_id.to_proto(),
416 epoch: 0,
417 version: serialize_version(&text_buffers[2].version()),
418 },
419 ]
420 );
421
422 // Observe an earlier version of the buffer.
423 db.observe_buffer_version(
424 buffers[1].id,
425 observer_id,
426 1,
427 &[rpc::proto::VectorClockEntry {
428 replica_id: 0,
429 timestamp: 0,
430 }],
431 )
432 .await
433 .unwrap();
434
435 let buffer_changes = db
436 .transaction(|tx| {
437 let buffers = &buffers;
438 async move {
439 db.unseen_channel_buffer_changes(
440 observer_id,
441 &[
442 buffers[0].channel_id,
443 buffers[1].channel_id,
444 buffers[2].channel_id,
445 ],
446 &*tx,
447 )
448 .await
449 }
450 })
451 .await
452 .unwrap();
453
454 assert_eq!(
455 buffer_changes,
456 [
457 rpc::proto::UnseenChannelBufferChange {
458 channel_id: buffers[0].channel_id.to_proto(),
459 epoch: 0,
460 version: serialize_version(&text_buffers[0].version()),
461 },
462 rpc::proto::UnseenChannelBufferChange {
463 channel_id: buffers[2].channel_id.to_proto(),
464 epoch: 0,
465 version: serialize_version(&text_buffers[2].version()),
466 },
467 ]
468 );
469}
470
471async fn update_buffer(
472 channel_id: ChannelId,
473 user_id: UserId,
474 db: &Database,
475 operations: Vec<text::Operation>,
476) {
477 let operations = operations
478 .into_iter()
479 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
480 .collect::<Vec<_>>();
481 db.update_channel_buffer(channel_id, user_id, &operations)
482 .await
483 .unwrap();
484}
485
486fn assert_operations(
487 operations: &[buffer_operation::Model],
488 expected: &[(BufferId, i32, &text::Buffer)],
489) {
490 let actual = operations
491 .iter()
492 .map(|op| buffer_operation::Model {
493 buffer_id: op.buffer_id,
494 epoch: op.epoch,
495 lamport_timestamp: op.lamport_timestamp,
496 replica_id: op.replica_id,
497 value: vec![],
498 })
499 .collect::<Vec<_>>();
500 let expected = expected
501 .iter()
502 .map(|(buffer_id, epoch, buffer)| buffer_operation::Model {
503 buffer_id: *buffer_id,
504 epoch: *epoch,
505 lamport_timestamp: buffer.lamport_clock.value as i32 - 1,
506 replica_id: buffer.replica_id() as i32,
507 value: vec![],
508 })
509 .collect::<Vec<_>>();
510 assert_eq!(actual, expected, "unexpected operations")
511}