buffer_tests.rs

  1use super::*;
  2use crate::test_both_dbs;
  3use language::proto::{self, serialize_version};
  4use text::Buffer;
  5
  6test_both_dbs!(
  7    test_channel_buffers,
  8    test_channel_buffers_postgres,
  9    test_channel_buffers_sqlite
 10);
 11
 12async fn test_channel_buffers(db: &Arc<Database>) {
 13    let a_id = db
 14        .create_user(
 15            "user_a@example.com",
 16            false,
 17            NewUserParams {
 18                github_login: "user_a".into(),
 19                github_user_id: 101,
 20            },
 21        )
 22        .await
 23        .unwrap()
 24        .user_id;
 25    let b_id = db
 26        .create_user(
 27            "user_b@example.com",
 28            false,
 29            NewUserParams {
 30                github_login: "user_b".into(),
 31                github_user_id: 102,
 32            },
 33        )
 34        .await
 35        .unwrap()
 36        .user_id;
 37
 38    // This user will not be a part of the channel
 39    let c_id = db
 40        .create_user(
 41            "user_c@example.com",
 42            false,
 43            NewUserParams {
 44                github_login: "user_c".into(),
 45                github_user_id: 102,
 46            },
 47        )
 48        .await
 49        .unwrap()
 50        .user_id;
 51
 52    let owner_id = db.create_server("production").await.unwrap().0 as u32;
 53
 54    let zed_id = db.create_root_channel("zed", a_id).await.unwrap();
 55
 56    db.invite_channel_member(zed_id, b_id, a_id, ChannelRole::Member)
 57        .await
 58        .unwrap();
 59
 60    db.respond_to_channel_invite(zed_id, b_id, true)
 61        .await
 62        .unwrap();
 63
 64    let connection_id_a = ConnectionId { owner_id, id: 1 };
 65    let _ = db
 66        .join_channel_buffer(zed_id, a_id, connection_id_a)
 67        .await
 68        .unwrap();
 69
 70    let mut buffer_a = Buffer::new(0, text::BufferId::new(1).unwrap(), "".to_string());
 71    let operations = vec![
 72        buffer_a.edit([(0..0, "hello world")]),
 73        buffer_a.edit([(5..5, ", cruel")]),
 74        buffer_a.edit([(0..5, "goodbye")]),
 75        buffer_a.undo().unwrap().1,
 76    ];
 77    assert_eq!(buffer_a.text(), "hello, cruel world");
 78
 79    let operations = operations
 80        .into_iter()
 81        .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
 82        .collect::<Vec<_>>();
 83
 84    db.update_channel_buffer(zed_id, a_id, &operations)
 85        .await
 86        .unwrap();
 87
 88    let connection_id_b = ConnectionId { owner_id, id: 2 };
 89    let buffer_response_b = db
 90        .join_channel_buffer(zed_id, b_id, connection_id_b)
 91        .await
 92        .unwrap();
 93
 94    let mut buffer_b = Buffer::new(
 95        0,
 96        text::BufferId::new(1).unwrap(),
 97        buffer_response_b.base_text,
 98    );
 99    buffer_b
100        .apply_ops(buffer_response_b.operations.into_iter().map(|operation| {
101            let operation = proto::deserialize_operation(operation).unwrap();
102            if let language::Operation::Buffer(operation) = operation {
103                operation
104            } else {
105                unreachable!()
106            }
107        }))
108        .unwrap();
109
110    assert_eq!(buffer_b.text(), "hello, cruel world");
111
112    // Ensure that C fails to open the buffer
113    assert!(db
114        .join_channel_buffer(zed_id, c_id, ConnectionId { owner_id, id: 3 })
115        .await
116        .is_err());
117
118    // Ensure that both collaborators have shown up
119    assert_eq!(
120        buffer_response_b.collaborators,
121        &[
122            rpc::proto::Collaborator {
123                user_id: a_id.to_proto(),
124                peer_id: Some(rpc::proto::PeerId { id: 1, owner_id }),
125                replica_id: 0,
126            },
127            rpc::proto::Collaborator {
128                user_id: b_id.to_proto(),
129                peer_id: Some(rpc::proto::PeerId { id: 2, owner_id }),
130                replica_id: 1,
131            }
132        ]
133    );
134
135    // Ensure that get_channel_buffer_collaborators works
136    let zed_collaborats = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
137    assert_eq!(zed_collaborats, &[a_id, b_id]);
138
139    let left_buffer = db
140        .leave_channel_buffer(zed_id, connection_id_b)
141        .await
142        .unwrap();
143
144    assert_eq!(left_buffer.connections, &[connection_id_a],);
145
146    let cargo_id = db.create_root_channel("cargo", a_id).await.unwrap();
147    let _ = db
148        .join_channel_buffer(cargo_id, a_id, connection_id_a)
149        .await
150        .unwrap();
151
152    db.leave_channel_buffers(connection_id_a).await.unwrap();
153
154    let zed_collaborators = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
155    let cargo_collaborators = db.get_channel_buffer_collaborators(cargo_id).await.unwrap();
156    assert_eq!(zed_collaborators, &[]);
157    assert_eq!(cargo_collaborators, &[]);
158
159    // When everyone has left the channel, the operations are collapsed into
160    // a new base text.
161    let buffer_response_b = db
162        .join_channel_buffer(zed_id, b_id, connection_id_b)
163        .await
164        .unwrap();
165    assert_eq!(buffer_response_b.base_text, "hello, cruel world");
166    assert_eq!(buffer_response_b.operations, &[]);
167}
168
169test_both_dbs!(
170    test_channel_buffers_last_operations,
171    test_channel_buffers_last_operations_postgres,
172    test_channel_buffers_last_operations_sqlite
173);
174
175async fn test_channel_buffers_last_operations(db: &Database) {
176    let user_id = db
177        .create_user(
178            "user_a@example.com",
179            false,
180            NewUserParams {
181                github_login: "user_a".into(),
182                github_user_id: 101,
183            },
184        )
185        .await
186        .unwrap()
187        .user_id;
188    let observer_id = db
189        .create_user(
190            "user_b@example.com",
191            false,
192            NewUserParams {
193                github_login: "user_b".into(),
194                github_user_id: 102,
195            },
196        )
197        .await
198        .unwrap()
199        .user_id;
200    let owner_id = db.create_server("production").await.unwrap().0 as u32;
201    let connection_id = ConnectionId {
202        owner_id,
203        id: user_id.0 as u32,
204    };
205
206    let mut buffers = Vec::new();
207    let mut text_buffers = Vec::new();
208    for i in 0..3 {
209        let channel = db
210            .create_root_channel(&format!("channel-{i}"), user_id)
211            .await
212            .unwrap();
213
214        db.invite_channel_member(channel, observer_id, user_id, ChannelRole::Member)
215            .await
216            .unwrap();
217        db.respond_to_channel_invite(channel, observer_id, true)
218            .await
219            .unwrap();
220
221        db.join_channel_buffer(channel, user_id, connection_id)
222            .await
223            .unwrap();
224
225        buffers.push(
226            db.transaction(|tx| async move { db.get_channel_buffer(channel, &tx).await })
227                .await
228                .unwrap(),
229        );
230
231        text_buffers.push(Buffer::new(
232            0,
233            text::BufferId::new(1).unwrap(),
234            "".to_string(),
235        ));
236    }
237
238    let operations = db
239        .transaction(|tx| {
240            let buffers = &buffers;
241            async move {
242                db.get_latest_operations_for_buffers([buffers[0].id, buffers[2].id], &tx)
243                    .await
244            }
245        })
246        .await
247        .unwrap();
248
249    assert!(operations.is_empty());
250
251    update_buffer(
252        buffers[0].channel_id,
253        user_id,
254        db,
255        vec![
256            text_buffers[0].edit([(0..0, "a")]),
257            text_buffers[0].edit([(0..0, "b")]),
258            text_buffers[0].edit([(0..0, "c")]),
259        ],
260    )
261    .await;
262
263    update_buffer(
264        buffers[1].channel_id,
265        user_id,
266        db,
267        vec![
268            text_buffers[1].edit([(0..0, "d")]),
269            text_buffers[1].edit([(1..1, "e")]),
270            text_buffers[1].edit([(2..2, "f")]),
271        ],
272    )
273    .await;
274
275    // cause buffer 1's epoch to increment.
276    db.leave_channel_buffer(buffers[1].channel_id, connection_id)
277        .await
278        .unwrap();
279    db.join_channel_buffer(buffers[1].channel_id, user_id, connection_id)
280        .await
281        .unwrap();
282    text_buffers[1] = Buffer::new(1, text::BufferId::new(1).unwrap(), "def".to_string());
283    update_buffer(
284        buffers[1].channel_id,
285        user_id,
286        db,
287        vec![
288            text_buffers[1].edit([(0..0, "g")]),
289            text_buffers[1].edit([(0..0, "h")]),
290        ],
291    )
292    .await;
293
294    update_buffer(
295        buffers[2].channel_id,
296        user_id,
297        db,
298        vec![text_buffers[2].edit([(0..0, "i")])],
299    )
300    .await;
301
302    let operations = db
303        .transaction(|tx| {
304            let buffers = &buffers;
305            async move {
306                db.get_latest_operations_for_buffers([buffers[1].id, buffers[2].id], &tx)
307                    .await
308            }
309        })
310        .await
311        .unwrap();
312    assert_operations(
313        &operations,
314        &[
315            (buffers[1].id, 1, &text_buffers[1]),
316            (buffers[2].id, 0, &text_buffers[2]),
317        ],
318    );
319
320    let operations = db
321        .transaction(|tx| {
322            let buffers = &buffers;
323            async move {
324                db.get_latest_operations_for_buffers([buffers[0].id, buffers[1].id], &tx)
325                    .await
326            }
327        })
328        .await
329        .unwrap();
330    assert_operations(
331        &operations,
332        &[
333            (buffers[0].id, 0, &text_buffers[0]),
334            (buffers[1].id, 1, &text_buffers[1]),
335        ],
336    );
337
338    let buffer_changes = db
339        .transaction(|tx| {
340            let buffers = &buffers;
341            let mut hash = HashMap::default();
342            hash.insert(buffers[0].id, buffers[0].channel_id);
343            hash.insert(buffers[1].id, buffers[1].channel_id);
344            hash.insert(buffers[2].id, buffers[2].channel_id);
345
346            async move { db.latest_channel_buffer_changes(&hash, &tx).await }
347        })
348        .await
349        .unwrap();
350
351    pretty_assertions::assert_eq!(
352        buffer_changes,
353        [
354            rpc::proto::ChannelBufferVersion {
355                channel_id: buffers[0].channel_id.to_proto(),
356                epoch: 0,
357                version: serialize_version(&text_buffers[0].version()),
358            },
359            rpc::proto::ChannelBufferVersion {
360                channel_id: buffers[1].channel_id.to_proto(),
361                epoch: 1,
362                version: serialize_version(&text_buffers[1].version())
363                    .into_iter()
364                    .filter(|vector| vector.replica_id
365                        == buffer_changes[1].version.first().unwrap().replica_id)
366                    .collect::<Vec<_>>(),
367            },
368            rpc::proto::ChannelBufferVersion {
369                channel_id: buffers[2].channel_id.to_proto(),
370                epoch: 0,
371                version: serialize_version(&text_buffers[2].version()),
372            },
373        ]
374    );
375}
376
377async fn update_buffer(
378    channel_id: ChannelId,
379    user_id: UserId,
380    db: &Database,
381    operations: Vec<text::Operation>,
382) {
383    let operations = operations
384        .into_iter()
385        .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
386        .collect::<Vec<_>>();
387    db.update_channel_buffer(channel_id, user_id, &operations)
388        .await
389        .unwrap();
390}
391
392fn assert_operations(
393    operations: &[buffer_operation::Model],
394    expected: &[(BufferId, i32, &text::Buffer)],
395) {
396    let actual = operations
397        .iter()
398        .map(|op| buffer_operation::Model {
399            buffer_id: op.buffer_id,
400            epoch: op.epoch,
401            lamport_timestamp: op.lamport_timestamp,
402            replica_id: op.replica_id,
403            value: vec![],
404        })
405        .collect::<Vec<_>>();
406    let expected = expected
407        .iter()
408        .map(|(buffer_id, epoch, buffer)| buffer_operation::Model {
409            buffer_id: *buffer_id,
410            epoch: *epoch,
411            lamport_timestamp: buffer.lamport_clock.value as i32 - 1,
412            replica_id: buffer.replica_id() as i32,
413            value: vec![],
414        })
415        .collect::<Vec<_>>();
416    assert_eq!(actual, expected, "unexpected operations")
417}