buffer_tests.rs

  1use super::*;
  2use crate::test_both_dbs;
  3use language::proto::{self, serialize_version};
  4use text::Buffer;
  5
  6test_both_dbs!(
  7    test_channel_buffers,
  8    test_channel_buffers_postgres,
  9    test_channel_buffers_sqlite
 10);
 11
 12async fn test_channel_buffers(db: &Arc<Database>) {
 13    let a_id = db
 14        .create_user(
 15            "user_a@example.com",
 16            false,
 17            NewUserParams {
 18                github_login: "user_a".into(),
 19                github_user_id: 101,
 20            },
 21        )
 22        .await
 23        .unwrap()
 24        .user_id;
 25    let b_id = db
 26        .create_user(
 27            "user_b@example.com",
 28            false,
 29            NewUserParams {
 30                github_login: "user_b".into(),
 31                github_user_id: 102,
 32            },
 33        )
 34        .await
 35        .unwrap()
 36        .user_id;
 37
 38    // This user will not be a part of the channel
 39    let c_id = db
 40        .create_user(
 41            "user_c@example.com",
 42            false,
 43            NewUserParams {
 44                github_login: "user_c".into(),
 45                github_user_id: 102,
 46            },
 47        )
 48        .await
 49        .unwrap()
 50        .user_id;
 51
 52    let owner_id = db.create_server("production").await.unwrap().0 as u32;
 53
 54    let zed_id = db.create_root_channel("zed", a_id).await.unwrap();
 55
 56    db.invite_channel_member(zed_id, b_id, a_id, ChannelRole::Member)
 57        .await
 58        .unwrap();
 59
 60    db.respond_to_channel_invite(zed_id, b_id, true)
 61        .await
 62        .unwrap();
 63
 64    let connection_id_a = ConnectionId { owner_id, id: 1 };
 65    let _ = db
 66        .join_channel_buffer(zed_id, a_id, connection_id_a)
 67        .await
 68        .unwrap();
 69
 70    let mut buffer_a = Buffer::new(0, text::BufferId::new(1).unwrap(), "".to_string());
 71    let mut operations = Vec::new();
 72    operations.push(buffer_a.edit([(0..0, "hello world")]));
 73    operations.push(buffer_a.edit([(5..5, ", cruel")]));
 74    operations.push(buffer_a.edit([(0..5, "goodbye")]));
 75    operations.push(buffer_a.undo().unwrap().1);
 76    assert_eq!(buffer_a.text(), "hello, cruel world");
 77
 78    let operations = operations
 79        .into_iter()
 80        .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
 81        .collect::<Vec<_>>();
 82
 83    db.update_channel_buffer(zed_id, a_id, &operations)
 84        .await
 85        .unwrap();
 86
 87    let connection_id_b = ConnectionId { owner_id, id: 2 };
 88    let buffer_response_b = db
 89        .join_channel_buffer(zed_id, b_id, connection_id_b)
 90        .await
 91        .unwrap();
 92
 93    let mut buffer_b = Buffer::new(
 94        0,
 95        text::BufferId::new(1).unwrap(),
 96        buffer_response_b.base_text,
 97    );
 98    buffer_b
 99        .apply_ops(buffer_response_b.operations.into_iter().map(|operation| {
100            let operation = proto::deserialize_operation(operation).unwrap();
101            if let language::Operation::Buffer(operation) = operation {
102                operation
103            } else {
104                unreachable!()
105            }
106        }))
107        .unwrap();
108
109    assert_eq!(buffer_b.text(), "hello, cruel world");
110
111    // Ensure that C fails to open the buffer
112    assert!(db
113        .join_channel_buffer(zed_id, c_id, ConnectionId { owner_id, id: 3 })
114        .await
115        .is_err());
116
117    // Ensure that both collaborators have shown up
118    assert_eq!(
119        buffer_response_b.collaborators,
120        &[
121            rpc::proto::Collaborator {
122                user_id: a_id.to_proto(),
123                peer_id: Some(rpc::proto::PeerId { id: 1, owner_id }),
124                replica_id: 0,
125            },
126            rpc::proto::Collaborator {
127                user_id: b_id.to_proto(),
128                peer_id: Some(rpc::proto::PeerId { id: 2, owner_id }),
129                replica_id: 1,
130            }
131        ]
132    );
133
134    // Ensure that get_channel_buffer_collaborators works
135    let zed_collaborats = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
136    assert_eq!(zed_collaborats, &[a_id, b_id]);
137
138    let left_buffer = db
139        .leave_channel_buffer(zed_id, connection_id_b)
140        .await
141        .unwrap();
142
143    assert_eq!(left_buffer.connections, &[connection_id_a],);
144
145    let cargo_id = db.create_root_channel("cargo", a_id).await.unwrap();
146    let _ = db
147        .join_channel_buffer(cargo_id, a_id, connection_id_a)
148        .await
149        .unwrap();
150
151    db.leave_channel_buffers(connection_id_a).await.unwrap();
152
153    let zed_collaborators = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
154    let cargo_collaborators = db.get_channel_buffer_collaborators(cargo_id).await.unwrap();
155    assert_eq!(zed_collaborators, &[]);
156    assert_eq!(cargo_collaborators, &[]);
157
158    // When everyone has left the channel, the operations are collapsed into
159    // a new base text.
160    let buffer_response_b = db
161        .join_channel_buffer(zed_id, b_id, connection_id_b)
162        .await
163        .unwrap();
164    assert_eq!(buffer_response_b.base_text, "hello, cruel world");
165    assert_eq!(buffer_response_b.operations, &[]);
166}
167
168test_both_dbs!(
169    test_channel_buffers_last_operations,
170    test_channel_buffers_last_operations_postgres,
171    test_channel_buffers_last_operations_sqlite
172);
173
174async fn test_channel_buffers_last_operations(db: &Database) {
175    let user_id = db
176        .create_user(
177            "user_a@example.com",
178            false,
179            NewUserParams {
180                github_login: "user_a".into(),
181                github_user_id: 101,
182            },
183        )
184        .await
185        .unwrap()
186        .user_id;
187    let observer_id = db
188        .create_user(
189            "user_b@example.com",
190            false,
191            NewUserParams {
192                github_login: "user_b".into(),
193                github_user_id: 102,
194            },
195        )
196        .await
197        .unwrap()
198        .user_id;
199    let owner_id = db.create_server("production").await.unwrap().0 as u32;
200    let connection_id = ConnectionId {
201        owner_id,
202        id: user_id.0 as u32,
203    };
204
205    let mut buffers = Vec::new();
206    let mut text_buffers = Vec::new();
207    for i in 0..3 {
208        let channel = db
209            .create_root_channel(&format!("channel-{i}"), user_id)
210            .await
211            .unwrap();
212
213        db.invite_channel_member(channel, observer_id, user_id, ChannelRole::Member)
214            .await
215            .unwrap();
216        db.respond_to_channel_invite(channel, observer_id, true)
217            .await
218            .unwrap();
219
220        db.join_channel_buffer(channel, user_id, connection_id)
221            .await
222            .unwrap();
223
224        buffers.push(
225            db.transaction(|tx| async move { db.get_channel_buffer(channel, &*tx).await })
226                .await
227                .unwrap(),
228        );
229
230        text_buffers.push(Buffer::new(
231            0,
232            text::BufferId::new(1).unwrap(),
233            "".to_string(),
234        ));
235    }
236
237    let operations = db
238        .transaction(|tx| {
239            let buffers = &buffers;
240            async move {
241                db.get_latest_operations_for_buffers([buffers[0].id, buffers[2].id], &*tx)
242                    .await
243            }
244        })
245        .await
246        .unwrap();
247
248    assert!(operations.is_empty());
249
250    update_buffer(
251        buffers[0].channel_id,
252        user_id,
253        db,
254        vec![
255            text_buffers[0].edit([(0..0, "a")]),
256            text_buffers[0].edit([(0..0, "b")]),
257            text_buffers[0].edit([(0..0, "c")]),
258        ],
259    )
260    .await;
261
262    update_buffer(
263        buffers[1].channel_id,
264        user_id,
265        db,
266        vec![
267            text_buffers[1].edit([(0..0, "d")]),
268            text_buffers[1].edit([(1..1, "e")]),
269            text_buffers[1].edit([(2..2, "f")]),
270        ],
271    )
272    .await;
273
274    // cause buffer 1's epoch to increment.
275    db.leave_channel_buffer(buffers[1].channel_id, connection_id)
276        .await
277        .unwrap();
278    db.join_channel_buffer(buffers[1].channel_id, user_id, connection_id)
279        .await
280        .unwrap();
281    text_buffers[1] = Buffer::new(1, text::BufferId::new(1).unwrap(), "def".to_string());
282    update_buffer(
283        buffers[1].channel_id,
284        user_id,
285        db,
286        vec![
287            text_buffers[1].edit([(0..0, "g")]),
288            text_buffers[1].edit([(0..0, "h")]),
289        ],
290    )
291    .await;
292
293    update_buffer(
294        buffers[2].channel_id,
295        user_id,
296        db,
297        vec![text_buffers[2].edit([(0..0, "i")])],
298    )
299    .await;
300
301    let operations = db
302        .transaction(|tx| {
303            let buffers = &buffers;
304            async move {
305                db.get_latest_operations_for_buffers([buffers[1].id, buffers[2].id], &*tx)
306                    .await
307            }
308        })
309        .await
310        .unwrap();
311    assert_operations(
312        &operations,
313        &[
314            (buffers[1].id, 1, &text_buffers[1]),
315            (buffers[2].id, 0, &text_buffers[2]),
316        ],
317    );
318
319    let operations = db
320        .transaction(|tx| {
321            let buffers = &buffers;
322            async move {
323                db.get_latest_operations_for_buffers([buffers[0].id, buffers[1].id], &*tx)
324                    .await
325            }
326        })
327        .await
328        .unwrap();
329    assert_operations(
330        &operations,
331        &[
332            (buffers[0].id, 0, &text_buffers[0]),
333            (buffers[1].id, 1, &text_buffers[1]),
334        ],
335    );
336
337    let buffer_changes = db
338        .transaction(|tx| {
339            let buffers = &buffers;
340            let mut hash = HashMap::default();
341            hash.insert(buffers[0].id, buffers[0].channel_id);
342            hash.insert(buffers[1].id, buffers[1].channel_id);
343            hash.insert(buffers[2].id, buffers[2].channel_id);
344
345            async move { db.latest_channel_buffer_changes(&hash, &*tx).await }
346        })
347        .await
348        .unwrap();
349
350    pretty_assertions::assert_eq!(
351        buffer_changes,
352        [
353            rpc::proto::ChannelBufferVersion {
354                channel_id: buffers[0].channel_id.to_proto(),
355                epoch: 0,
356                version: serialize_version(&text_buffers[0].version()),
357            },
358            rpc::proto::ChannelBufferVersion {
359                channel_id: buffers[1].channel_id.to_proto(),
360                epoch: 1,
361                version: serialize_version(&text_buffers[1].version())
362                    .into_iter()
363                    .filter(|vector| vector.replica_id
364                        == buffer_changes[1].version.first().unwrap().replica_id)
365                    .collect::<Vec<_>>(),
366            },
367            rpc::proto::ChannelBufferVersion {
368                channel_id: buffers[2].channel_id.to_proto(),
369                epoch: 0,
370                version: serialize_version(&text_buffers[2].version()),
371            },
372        ]
373    );
374}
375
376async fn update_buffer(
377    channel_id: ChannelId,
378    user_id: UserId,
379    db: &Database,
380    operations: Vec<text::Operation>,
381) {
382    let operations = operations
383        .into_iter()
384        .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
385        .collect::<Vec<_>>();
386    db.update_channel_buffer(channel_id, user_id, &operations)
387        .await
388        .unwrap();
389}
390
391fn assert_operations(
392    operations: &[buffer_operation::Model],
393    expected: &[(BufferId, i32, &text::Buffer)],
394) {
395    let actual = operations
396        .iter()
397        .map(|op| buffer_operation::Model {
398            buffer_id: op.buffer_id,
399            epoch: op.epoch,
400            lamport_timestamp: op.lamport_timestamp,
401            replica_id: op.replica_id,
402            value: vec![],
403        })
404        .collect::<Vec<_>>();
405    let expected = expected
406        .iter()
407        .map(|(buffer_id, epoch, buffer)| buffer_operation::Model {
408            buffer_id: *buffer_id,
409            epoch: *epoch,
410            lamport_timestamp: buffer.lamport_clock.value as i32 - 1,
411            replica_id: buffer.replica_id() as i32,
412            value: vec![],
413        })
414        .collect::<Vec<_>>();
415    assert_eq!(actual, expected, "unexpected operations")
416}