buffer_tests.rs

  1use super::*;
  2use crate::test_both_dbs;
  3use language::proto::{self, serialize_version};
  4use text::Buffer;
  5
  6test_both_dbs!(
  7    test_channel_buffers,
  8    test_channel_buffers_postgres,
  9    test_channel_buffers_sqlite
 10);
 11
 12async fn test_channel_buffers(db: &Arc<Database>) {
 13    let a_id = db
 14        .create_user(
 15            "user_a@example.com",
 16            false,
 17            NewUserParams {
 18                github_login: "user_a".into(),
 19                github_user_id: 101,
 20                invite_count: 0,
 21            },
 22        )
 23        .await
 24        .unwrap()
 25        .user_id;
 26    let b_id = db
 27        .create_user(
 28            "user_b@example.com",
 29            false,
 30            NewUserParams {
 31                github_login: "user_b".into(),
 32                github_user_id: 102,
 33                invite_count: 0,
 34            },
 35        )
 36        .await
 37        .unwrap()
 38        .user_id;
 39
 40    // This user will not be a part of the channel
 41    let c_id = db
 42        .create_user(
 43            "user_c@example.com",
 44            false,
 45            NewUserParams {
 46                github_login: "user_c".into(),
 47                github_user_id: 102,
 48                invite_count: 0,
 49            },
 50        )
 51        .await
 52        .unwrap()
 53        .user_id;
 54
 55    let owner_id = db.create_server("production").await.unwrap().0 as u32;
 56
 57    let zed_id = db.create_root_channel("zed", a_id).await.unwrap();
 58
 59    db.invite_channel_member(zed_id, b_id, a_id, false)
 60        .await
 61        .unwrap();
 62
 63    db.respond_to_channel_invite(zed_id, b_id, true)
 64        .await
 65        .unwrap();
 66
 67    let connection_id_a = ConnectionId { owner_id, id: 1 };
 68    let _ = db
 69        .join_channel_buffer(zed_id, a_id, connection_id_a)
 70        .await
 71        .unwrap();
 72
 73    let mut buffer_a = Buffer::new(0, 0, "".to_string());
 74    let mut operations = Vec::new();
 75    operations.push(buffer_a.edit([(0..0, "hello world")]));
 76    operations.push(buffer_a.edit([(5..5, ", cruel")]));
 77    operations.push(buffer_a.edit([(0..5, "goodbye")]));
 78    operations.push(buffer_a.undo().unwrap().1);
 79    assert_eq!(buffer_a.text(), "hello, cruel world");
 80
 81    let operations = operations
 82        .into_iter()
 83        .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
 84        .collect::<Vec<_>>();
 85
 86    db.update_channel_buffer(zed_id, a_id, &operations)
 87        .await
 88        .unwrap();
 89
 90    let connection_id_b = ConnectionId { owner_id, id: 2 };
 91    let buffer_response_b = db
 92        .join_channel_buffer(zed_id, b_id, connection_id_b)
 93        .await
 94        .unwrap();
 95
 96    let mut buffer_b = Buffer::new(0, 0, buffer_response_b.base_text);
 97    buffer_b
 98        .apply_ops(buffer_response_b.operations.into_iter().map(|operation| {
 99            let operation = proto::deserialize_operation(operation).unwrap();
100            if let language::Operation::Buffer(operation) = operation {
101                operation
102            } else {
103                unreachable!()
104            }
105        }))
106        .unwrap();
107
108    assert_eq!(buffer_b.text(), "hello, cruel world");
109
110    // Ensure that C fails to open the buffer
111    assert!(db
112        .join_channel_buffer(zed_id, c_id, ConnectionId { owner_id, id: 3 })
113        .await
114        .is_err());
115
116    // Ensure that both collaborators have shown up
117    assert_eq!(
118        buffer_response_b.collaborators,
119        &[
120            rpc::proto::Collaborator {
121                user_id: a_id.to_proto(),
122                peer_id: Some(rpc::proto::PeerId { id: 1, owner_id }),
123                replica_id: 0,
124            },
125            rpc::proto::Collaborator {
126                user_id: b_id.to_proto(),
127                peer_id: Some(rpc::proto::PeerId { id: 2, owner_id }),
128                replica_id: 1,
129            }
130        ]
131    );
132
133    // Ensure that get_channel_buffer_collaborators works
134    let zed_collaborats = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
135    assert_eq!(zed_collaborats, &[a_id, b_id]);
136
137    let left_buffer = db
138        .leave_channel_buffer(zed_id, connection_id_b)
139        .await
140        .unwrap();
141
142    assert_eq!(left_buffer.connections, &[connection_id_a],);
143
144    let cargo_id = db.create_root_channel("cargo", a_id).await.unwrap();
145    let _ = db
146        .join_channel_buffer(cargo_id, a_id, connection_id_a)
147        .await
148        .unwrap();
149
150    db.leave_channel_buffers(connection_id_a).await.unwrap();
151
152    let zed_collaborators = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
153    let cargo_collaborators = db.get_channel_buffer_collaborators(cargo_id).await.unwrap();
154    assert_eq!(zed_collaborators, &[]);
155    assert_eq!(cargo_collaborators, &[]);
156
157    // When everyone has left the channel, the operations are collapsed into
158    // a new base text.
159    let buffer_response_b = db
160        .join_channel_buffer(zed_id, b_id, connection_id_b)
161        .await
162        .unwrap();
163    assert_eq!(buffer_response_b.base_text, "hello, cruel world");
164    assert_eq!(buffer_response_b.operations, &[]);
165}
166
167test_both_dbs!(
168    test_channel_buffers_last_operations,
169    test_channel_buffers_last_operations_postgres,
170    test_channel_buffers_last_operations_sqlite
171);
172
173async fn test_channel_buffers_last_operations(db: &Database) {
174    let user_id = db
175        .create_user(
176            "user_a@example.com",
177            false,
178            NewUserParams {
179                github_login: "user_a".into(),
180                github_user_id: 101,
181                invite_count: 0,
182            },
183        )
184        .await
185        .unwrap()
186        .user_id;
187    let observer_id = db
188        .create_user(
189            "user_b@example.com",
190            false,
191            NewUserParams {
192                github_login: "user_b".into(),
193                github_user_id: 102,
194                invite_count: 0,
195            },
196        )
197        .await
198        .unwrap()
199        .user_id;
200    let owner_id = db.create_server("production").await.unwrap().0 as u32;
201    let connection_id = ConnectionId {
202        owner_id,
203        id: user_id.0 as u32,
204    };
205
206    let mut buffers = Vec::new();
207    let mut text_buffers = Vec::new();
208    for i in 0..3 {
209        let channel = db
210            .create_root_channel(&format!("channel-{i}"), user_id)
211            .await
212            .unwrap();
213
214        db.invite_channel_member(channel, observer_id, user_id, false)
215            .await
216            .unwrap();
217        db.respond_to_channel_invite(channel, observer_id, true)
218            .await
219            .unwrap();
220
221        db.join_channel_buffer(channel, user_id, connection_id)
222            .await
223            .unwrap();
224
225        buffers.push(
226            db.transaction(|tx| async move { db.get_channel_buffer(channel, &*tx).await })
227                .await
228                .unwrap(),
229        );
230
231        text_buffers.push(Buffer::new(0, 0, "".to_string()));
232    }
233
234    let operations = db
235        .transaction(|tx| {
236            let buffers = &buffers;
237            async move {
238                db.get_latest_operations_for_buffers([buffers[0].id, buffers[2].id], &*tx)
239                    .await
240            }
241        })
242        .await
243        .unwrap();
244
245    assert!(operations.is_empty());
246
247    update_buffer(
248        buffers[0].channel_id,
249        user_id,
250        db,
251        vec![
252            text_buffers[0].edit([(0..0, "a")]),
253            text_buffers[0].edit([(0..0, "b")]),
254            text_buffers[0].edit([(0..0, "c")]),
255        ],
256    )
257    .await;
258
259    update_buffer(
260        buffers[1].channel_id,
261        user_id,
262        db,
263        vec![
264            text_buffers[1].edit([(0..0, "d")]),
265            text_buffers[1].edit([(1..1, "e")]),
266            text_buffers[1].edit([(2..2, "f")]),
267        ],
268    )
269    .await;
270
271    // cause buffer 1's epoch to increment.
272    db.leave_channel_buffer(buffers[1].channel_id, connection_id)
273        .await
274        .unwrap();
275    db.join_channel_buffer(buffers[1].channel_id, user_id, connection_id)
276        .await
277        .unwrap();
278    text_buffers[1] = Buffer::new(1, 0, "def".to_string());
279    update_buffer(
280        buffers[1].channel_id,
281        user_id,
282        db,
283        vec![
284            text_buffers[1].edit([(0..0, "g")]),
285            text_buffers[1].edit([(0..0, "h")]),
286        ],
287    )
288    .await;
289
290    update_buffer(
291        buffers[2].channel_id,
292        user_id,
293        db,
294        vec![text_buffers[2].edit([(0..0, "i")])],
295    )
296    .await;
297
298    let operations = db
299        .transaction(|tx| {
300            let buffers = &buffers;
301            async move {
302                db.get_latest_operations_for_buffers([buffers[1].id, buffers[2].id], &*tx)
303                    .await
304            }
305        })
306        .await
307        .unwrap();
308    assert_operations(
309        &operations,
310        &[
311            (buffers[1].id, 1, &text_buffers[1]),
312            (buffers[2].id, 0, &text_buffers[2]),
313        ],
314    );
315
316    let operations = db
317        .transaction(|tx| {
318            let buffers = &buffers;
319            async move {
320                db.get_latest_operations_for_buffers([buffers[0].id, buffers[1].id], &*tx)
321                    .await
322            }
323        })
324        .await
325        .unwrap();
326    assert_operations(
327        &operations,
328        &[
329            (buffers[0].id, 0, &text_buffers[0]),
330            (buffers[1].id, 1, &text_buffers[1]),
331        ],
332    );
333
334    let buffer_changes = db
335        .transaction(|tx| {
336            let buffers = &buffers;
337            async move {
338                db.unseen_channel_buffer_changes(
339                    observer_id,
340                    &[
341                        buffers[0].channel_id,
342                        buffers[1].channel_id,
343                        buffers[2].channel_id,
344                    ],
345                    &*tx,
346                )
347                .await
348            }
349        })
350        .await
351        .unwrap();
352
353    pretty_assertions::assert_eq!(
354        buffer_changes,
355        [
356            rpc::proto::UnseenChannelBufferChange {
357                channel_id: buffers[0].channel_id.to_proto(),
358                epoch: 0,
359                version: serialize_version(&text_buffers[0].version()),
360            },
361            rpc::proto::UnseenChannelBufferChange {
362                channel_id: buffers[1].channel_id.to_proto(),
363                epoch: 1,
364                version: serialize_version(&text_buffers[1].version())
365                    .into_iter()
366                    .filter(|vector| vector.replica_id
367                        == buffer_changes[1].version.first().unwrap().replica_id)
368                    .collect::<Vec<_>>(),
369            },
370            rpc::proto::UnseenChannelBufferChange {
371                channel_id: buffers[2].channel_id.to_proto(),
372                epoch: 0,
373                version: serialize_version(&text_buffers[2].version()),
374            },
375        ]
376    );
377
378    db.observe_buffer_version(
379        buffers[1].id,
380        observer_id,
381        1,
382        serialize_version(&text_buffers[1].version()).as_slice(),
383    )
384    .await
385    .unwrap();
386
387    let buffer_changes = db
388        .transaction(|tx| {
389            let buffers = &buffers;
390            async move {
391                db.unseen_channel_buffer_changes(
392                    observer_id,
393                    &[
394                        buffers[0].channel_id,
395                        buffers[1].channel_id,
396                        buffers[2].channel_id,
397                    ],
398                    &*tx,
399                )
400                .await
401            }
402        })
403        .await
404        .unwrap();
405
406    assert_eq!(
407        buffer_changes,
408        [
409            rpc::proto::UnseenChannelBufferChange {
410                channel_id: buffers[0].channel_id.to_proto(),
411                epoch: 0,
412                version: serialize_version(&text_buffers[0].version()),
413            },
414            rpc::proto::UnseenChannelBufferChange {
415                channel_id: buffers[2].channel_id.to_proto(),
416                epoch: 0,
417                version: serialize_version(&text_buffers[2].version()),
418            },
419        ]
420    );
421
422    // Observe an earlier version of the buffer.
423    db.observe_buffer_version(
424        buffers[1].id,
425        observer_id,
426        1,
427        &[rpc::proto::VectorClockEntry {
428            replica_id: 0,
429            timestamp: 0,
430        }],
431    )
432    .await
433    .unwrap();
434
435    let buffer_changes = db
436        .transaction(|tx| {
437            let buffers = &buffers;
438            async move {
439                db.unseen_channel_buffer_changes(
440                    observer_id,
441                    &[
442                        buffers[0].channel_id,
443                        buffers[1].channel_id,
444                        buffers[2].channel_id,
445                    ],
446                    &*tx,
447                )
448                .await
449            }
450        })
451        .await
452        .unwrap();
453
454    assert_eq!(
455        buffer_changes,
456        [
457            rpc::proto::UnseenChannelBufferChange {
458                channel_id: buffers[0].channel_id.to_proto(),
459                epoch: 0,
460                version: serialize_version(&text_buffers[0].version()),
461            },
462            rpc::proto::UnseenChannelBufferChange {
463                channel_id: buffers[2].channel_id.to_proto(),
464                epoch: 0,
465                version: serialize_version(&text_buffers[2].version()),
466            },
467        ]
468    );
469}
470
471async fn update_buffer(
472    channel_id: ChannelId,
473    user_id: UserId,
474    db: &Database,
475    operations: Vec<text::Operation>,
476) {
477    let operations = operations
478        .into_iter()
479        .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
480        .collect::<Vec<_>>();
481    db.update_channel_buffer(channel_id, user_id, &operations)
482        .await
483        .unwrap();
484}
485
486fn assert_operations(
487    operations: &[buffer_operation::Model],
488    expected: &[(BufferId, i32, &text::Buffer)],
489) {
490    let actual = operations
491        .iter()
492        .map(|op| buffer_operation::Model {
493            buffer_id: op.buffer_id,
494            epoch: op.epoch,
495            lamport_timestamp: op.lamport_timestamp,
496            replica_id: op.replica_id,
497            value: vec![],
498        })
499        .collect::<Vec<_>>();
500    let expected = expected
501        .iter()
502        .map(|(buffer_id, epoch, buffer)| buffer_operation::Model {
503            buffer_id: *buffer_id,
504            epoch: *epoch,
505            lamport_timestamp: buffer.lamport_clock.value as i32 - 1,
506            replica_id: buffer.replica_id() as i32,
507            value: vec![],
508        })
509        .collect::<Vec<_>>();
510    assert_eq!(actual, expected, "unexpected operations")
511}