buffer_tests.rs

  1use super::*;
  2use crate::test_both_dbs;
  3use language::proto::{self, serialize_version};
  4use text::Buffer;
  5
  6test_both_dbs!(
  7    test_channel_buffers,
  8    test_channel_buffers_postgres,
  9    test_channel_buffers_sqlite
 10);
 11
 12async fn test_channel_buffers(db: &Arc<Database>) {
 13    let a_id = db
 14        .create_user(
 15            "user_a@example.com",
 16            false,
 17            NewUserParams {
 18                github_login: "user_a".into(),
 19                github_user_id: 101,
 20            },
 21        )
 22        .await
 23        .unwrap()
 24        .user_id;
 25    let b_id = db
 26        .create_user(
 27            "user_b@example.com",
 28            false,
 29            NewUserParams {
 30                github_login: "user_b".into(),
 31                github_user_id: 102,
 32            },
 33        )
 34        .await
 35        .unwrap()
 36        .user_id;
 37
 38    // This user will not be a part of the channel
 39    let c_id = db
 40        .create_user(
 41            "user_c@example.com",
 42            false,
 43            NewUserParams {
 44                github_login: "user_c".into(),
 45                github_user_id: 102,
 46            },
 47        )
 48        .await
 49        .unwrap()
 50        .user_id;
 51
 52    let owner_id = db.create_server("production").await.unwrap().0 as u32;
 53
 54    let zed_id = db.create_root_channel("zed", a_id).await.unwrap();
 55
 56    db.invite_channel_member(zed_id, b_id, a_id, ChannelRole::Member)
 57        .await
 58        .unwrap();
 59
 60    db.respond_to_channel_invite(zed_id, b_id, true)
 61        .await
 62        .unwrap();
 63
 64    let connection_id_a = ConnectionId { owner_id, id: 1 };
 65    let _ = db
 66        .join_channel_buffer(zed_id, a_id, connection_id_a)
 67        .await
 68        .unwrap();
 69
 70    let mut buffer_a = Buffer::new(0, 0, "".to_string());
 71    let mut operations = Vec::new();
 72    operations.push(buffer_a.edit([(0..0, "hello world")]));
 73    operations.push(buffer_a.edit([(5..5, ", cruel")]));
 74    operations.push(buffer_a.edit([(0..5, "goodbye")]));
 75    operations.push(buffer_a.undo().unwrap().1);
 76    assert_eq!(buffer_a.text(), "hello, cruel world");
 77
 78    let operations = operations
 79        .into_iter()
 80        .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
 81        .collect::<Vec<_>>();
 82
 83    db.update_channel_buffer(zed_id, a_id, &operations)
 84        .await
 85        .unwrap();
 86
 87    let connection_id_b = ConnectionId { owner_id, id: 2 };
 88    let buffer_response_b = db
 89        .join_channel_buffer(zed_id, b_id, connection_id_b)
 90        .await
 91        .unwrap();
 92
 93    let mut buffer_b = Buffer::new(0, 0, buffer_response_b.base_text);
 94    buffer_b
 95        .apply_ops(buffer_response_b.operations.into_iter().map(|operation| {
 96            let operation = proto::deserialize_operation(operation).unwrap();
 97            if let language::Operation::Buffer(operation) = operation {
 98                operation
 99            } else {
100                unreachable!()
101            }
102        }))
103        .unwrap();
104
105    assert_eq!(buffer_b.text(), "hello, cruel world");
106
107    // Ensure that C fails to open the buffer
108    assert!(db
109        .join_channel_buffer(zed_id, c_id, ConnectionId { owner_id, id: 3 })
110        .await
111        .is_err());
112
113    // Ensure that both collaborators have shown up
114    assert_eq!(
115        buffer_response_b.collaborators,
116        &[
117            rpc::proto::Collaborator {
118                user_id: a_id.to_proto(),
119                peer_id: Some(rpc::proto::PeerId { id: 1, owner_id }),
120                replica_id: 0,
121            },
122            rpc::proto::Collaborator {
123                user_id: b_id.to_proto(),
124                peer_id: Some(rpc::proto::PeerId { id: 2, owner_id }),
125                replica_id: 1,
126            }
127        ]
128    );
129
130    // Ensure that get_channel_buffer_collaborators works
131    let zed_collaborats = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
132    assert_eq!(zed_collaborats, &[a_id, b_id]);
133
134    let left_buffer = db
135        .leave_channel_buffer(zed_id, connection_id_b)
136        .await
137        .unwrap();
138
139    assert_eq!(left_buffer.connections, &[connection_id_a],);
140
141    let cargo_id = db.create_root_channel("cargo", a_id).await.unwrap();
142    let _ = db
143        .join_channel_buffer(cargo_id, a_id, connection_id_a)
144        .await
145        .unwrap();
146
147    db.leave_channel_buffers(connection_id_a).await.unwrap();
148
149    let zed_collaborators = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
150    let cargo_collaborators = db.get_channel_buffer_collaborators(cargo_id).await.unwrap();
151    assert_eq!(zed_collaborators, &[]);
152    assert_eq!(cargo_collaborators, &[]);
153
154    // When everyone has left the channel, the operations are collapsed into
155    // a new base text.
156    let buffer_response_b = db
157        .join_channel_buffer(zed_id, b_id, connection_id_b)
158        .await
159        .unwrap();
160    assert_eq!(buffer_response_b.base_text, "hello, cruel world");
161    assert_eq!(buffer_response_b.operations, &[]);
162}
163
164test_both_dbs!(
165    test_channel_buffers_last_operations,
166    test_channel_buffers_last_operations_postgres,
167    test_channel_buffers_last_operations_sqlite
168);
169
170async fn test_channel_buffers_last_operations(db: &Database) {
171    let user_id = db
172        .create_user(
173            "user_a@example.com",
174            false,
175            NewUserParams {
176                github_login: "user_a".into(),
177                github_user_id: 101,
178            },
179        )
180        .await
181        .unwrap()
182        .user_id;
183    let observer_id = db
184        .create_user(
185            "user_b@example.com",
186            false,
187            NewUserParams {
188                github_login: "user_b".into(),
189                github_user_id: 102,
190            },
191        )
192        .await
193        .unwrap()
194        .user_id;
195    let owner_id = db.create_server("production").await.unwrap().0 as u32;
196    let connection_id = ConnectionId {
197        owner_id,
198        id: user_id.0 as u32,
199    };
200
201    let mut buffers = Vec::new();
202    let mut text_buffers = Vec::new();
203    for i in 0..3 {
204        let channel = db
205            .create_root_channel(&format!("channel-{i}"), user_id)
206            .await
207            .unwrap();
208
209        db.invite_channel_member(channel, observer_id, user_id, ChannelRole::Member)
210            .await
211            .unwrap();
212        db.respond_to_channel_invite(channel, observer_id, true)
213            .await
214            .unwrap();
215
216        db.join_channel_buffer(channel, user_id, connection_id)
217            .await
218            .unwrap();
219
220        buffers.push(
221            db.transaction(|tx| async move { db.get_channel_buffer(channel, &*tx).await })
222                .await
223                .unwrap(),
224        );
225
226        text_buffers.push(Buffer::new(0, 0, "".to_string()));
227    }
228
229    let operations = db
230        .transaction(|tx| {
231            let buffers = &buffers;
232            async move {
233                db.get_latest_operations_for_buffers([buffers[0].id, buffers[2].id], &*tx)
234                    .await
235            }
236        })
237        .await
238        .unwrap();
239
240    assert!(operations.is_empty());
241
242    update_buffer(
243        buffers[0].channel_id,
244        user_id,
245        db,
246        vec![
247            text_buffers[0].edit([(0..0, "a")]),
248            text_buffers[0].edit([(0..0, "b")]),
249            text_buffers[0].edit([(0..0, "c")]),
250        ],
251    )
252    .await;
253
254    update_buffer(
255        buffers[1].channel_id,
256        user_id,
257        db,
258        vec![
259            text_buffers[1].edit([(0..0, "d")]),
260            text_buffers[1].edit([(1..1, "e")]),
261            text_buffers[1].edit([(2..2, "f")]),
262        ],
263    )
264    .await;
265
266    // cause buffer 1's epoch to increment.
267    db.leave_channel_buffer(buffers[1].channel_id, connection_id)
268        .await
269        .unwrap();
270    db.join_channel_buffer(buffers[1].channel_id, user_id, connection_id)
271        .await
272        .unwrap();
273    text_buffers[1] = Buffer::new(1, 0, "def".to_string());
274    update_buffer(
275        buffers[1].channel_id,
276        user_id,
277        db,
278        vec![
279            text_buffers[1].edit([(0..0, "g")]),
280            text_buffers[1].edit([(0..0, "h")]),
281        ],
282    )
283    .await;
284
285    update_buffer(
286        buffers[2].channel_id,
287        user_id,
288        db,
289        vec![text_buffers[2].edit([(0..0, "i")])],
290    )
291    .await;
292
293    let operations = db
294        .transaction(|tx| {
295            let buffers = &buffers;
296            async move {
297                db.get_latest_operations_for_buffers([buffers[1].id, buffers[2].id], &*tx)
298                    .await
299            }
300        })
301        .await
302        .unwrap();
303    assert_operations(
304        &operations,
305        &[
306            (buffers[1].id, 1, &text_buffers[1]),
307            (buffers[2].id, 0, &text_buffers[2]),
308        ],
309    );
310
311    let operations = db
312        .transaction(|tx| {
313            let buffers = &buffers;
314            async move {
315                db.get_latest_operations_for_buffers([buffers[0].id, buffers[1].id], &*tx)
316                    .await
317            }
318        })
319        .await
320        .unwrap();
321    assert_operations(
322        &operations,
323        &[
324            (buffers[0].id, 0, &text_buffers[0]),
325            (buffers[1].id, 1, &text_buffers[1]),
326        ],
327    );
328
329    let buffer_changes = db
330        .transaction(|tx| {
331            let buffers = &buffers;
332            async move {
333                db.unseen_channel_buffer_changes(
334                    observer_id,
335                    &[
336                        buffers[0].channel_id,
337                        buffers[1].channel_id,
338                        buffers[2].channel_id,
339                    ],
340                    &*tx,
341                )
342                .await
343            }
344        })
345        .await
346        .unwrap();
347
348    pretty_assertions::assert_eq!(
349        buffer_changes,
350        [
351            rpc::proto::UnseenChannelBufferChange {
352                channel_id: buffers[0].channel_id.to_proto(),
353                epoch: 0,
354                version: serialize_version(&text_buffers[0].version()),
355            },
356            rpc::proto::UnseenChannelBufferChange {
357                channel_id: buffers[1].channel_id.to_proto(),
358                epoch: 1,
359                version: serialize_version(&text_buffers[1].version())
360                    .into_iter()
361                    .filter(|vector| vector.replica_id
362                        == buffer_changes[1].version.first().unwrap().replica_id)
363                    .collect::<Vec<_>>(),
364            },
365            rpc::proto::UnseenChannelBufferChange {
366                channel_id: buffers[2].channel_id.to_proto(),
367                epoch: 0,
368                version: serialize_version(&text_buffers[2].version()),
369            },
370        ]
371    );
372
373    db.observe_buffer_version(
374        buffers[1].id,
375        observer_id,
376        1,
377        serialize_version(&text_buffers[1].version()).as_slice(),
378    )
379    .await
380    .unwrap();
381
382    let buffer_changes = db
383        .transaction(|tx| {
384            let buffers = &buffers;
385            async move {
386                db.unseen_channel_buffer_changes(
387                    observer_id,
388                    &[
389                        buffers[0].channel_id,
390                        buffers[1].channel_id,
391                        buffers[2].channel_id,
392                    ],
393                    &*tx,
394                )
395                .await
396            }
397        })
398        .await
399        .unwrap();
400
401    assert_eq!(
402        buffer_changes,
403        [
404            rpc::proto::UnseenChannelBufferChange {
405                channel_id: buffers[0].channel_id.to_proto(),
406                epoch: 0,
407                version: serialize_version(&text_buffers[0].version()),
408            },
409            rpc::proto::UnseenChannelBufferChange {
410                channel_id: buffers[2].channel_id.to_proto(),
411                epoch: 0,
412                version: serialize_version(&text_buffers[2].version()),
413            },
414        ]
415    );
416
417    // Observe an earlier version of the buffer.
418    db.observe_buffer_version(
419        buffers[1].id,
420        observer_id,
421        1,
422        &[rpc::proto::VectorClockEntry {
423            replica_id: 0,
424            timestamp: 0,
425        }],
426    )
427    .await
428    .unwrap();
429
430    let buffer_changes = db
431        .transaction(|tx| {
432            let buffers = &buffers;
433            async move {
434                db.unseen_channel_buffer_changes(
435                    observer_id,
436                    &[
437                        buffers[0].channel_id,
438                        buffers[1].channel_id,
439                        buffers[2].channel_id,
440                    ],
441                    &*tx,
442                )
443                .await
444            }
445        })
446        .await
447        .unwrap();
448
449    assert_eq!(
450        buffer_changes,
451        [
452            rpc::proto::UnseenChannelBufferChange {
453                channel_id: buffers[0].channel_id.to_proto(),
454                epoch: 0,
455                version: serialize_version(&text_buffers[0].version()),
456            },
457            rpc::proto::UnseenChannelBufferChange {
458                channel_id: buffers[2].channel_id.to_proto(),
459                epoch: 0,
460                version: serialize_version(&text_buffers[2].version()),
461            },
462        ]
463    );
464}
465
466async fn update_buffer(
467    channel_id: ChannelId,
468    user_id: UserId,
469    db: &Database,
470    operations: Vec<text::Operation>,
471) {
472    let operations = operations
473        .into_iter()
474        .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
475        .collect::<Vec<_>>();
476    db.update_channel_buffer(channel_id, user_id, &operations)
477        .await
478        .unwrap();
479}
480
481fn assert_operations(
482    operations: &[buffer_operation::Model],
483    expected: &[(BufferId, i32, &text::Buffer)],
484) {
485    let actual = operations
486        .iter()
487        .map(|op| buffer_operation::Model {
488            buffer_id: op.buffer_id,
489            epoch: op.epoch,
490            lamport_timestamp: op.lamport_timestamp,
491            replica_id: op.replica_id,
492            value: vec![],
493        })
494        .collect::<Vec<_>>();
495    let expected = expected
496        .iter()
497        .map(|(buffer_id, epoch, buffer)| buffer_operation::Model {
498            buffer_id: *buffer_id,
499            epoch: *epoch,
500            lamport_timestamp: buffer.lamport_clock.value as i32 - 1,
501            replica_id: buffer.replica_id() as i32,
502            value: vec![],
503        })
504        .collect::<Vec<_>>();
505    assert_eq!(actual, expected, "unexpected operations")
506}