buffer_tests.rs

  1use super::*;
  2use crate::test_both_dbs;
  3use language::proto::{self, serialize_version};
  4use text::{Buffer, ReplicaId};
  5
  6test_both_dbs!(
  7    test_channel_buffers,
  8    test_channel_buffers_postgres,
  9    test_channel_buffers_sqlite
 10);
 11
 12async fn test_channel_buffers(db: &Arc<Database>) {
 13    let a_id = db
 14        .create_user(
 15            "user_a@example.com",
 16            None,
 17            false,
 18            NewUserParams {
 19                github_login: "user_a".into(),
 20                github_user_id: 101,
 21            },
 22        )
 23        .await
 24        .unwrap()
 25        .user_id;
 26    let b_id = db
 27        .create_user(
 28            "user_b@example.com",
 29            None,
 30            false,
 31            NewUserParams {
 32                github_login: "user_b".into(),
 33                github_user_id: 102,
 34            },
 35        )
 36        .await
 37        .unwrap()
 38        .user_id;
 39
 40    // This user will not be a part of the channel
 41    let c_id = db
 42        .create_user(
 43            "user_c@example.com",
 44            None,
 45            false,
 46            NewUserParams {
 47                github_login: "user_c".into(),
 48                github_user_id: 103,
 49            },
 50        )
 51        .await
 52        .unwrap()
 53        .user_id;
 54
 55    let owner_id = db.create_server("production").await.unwrap().0 as u32;
 56
 57    let zed_id = db.create_root_channel("zed", a_id).await.unwrap();
 58
 59    db.invite_channel_member(zed_id, b_id, a_id, ChannelRole::Member)
 60        .await
 61        .unwrap();
 62
 63    db.respond_to_channel_invite(zed_id, b_id, true)
 64        .await
 65        .unwrap();
 66
 67    let connection_id_a = ConnectionId { owner_id, id: 1 };
 68    let _ = db
 69        .join_channel_buffer(zed_id, a_id, connection_id_a)
 70        .await
 71        .unwrap();
 72
 73    let mut buffer_a = Buffer::new(
 74        ReplicaId::new(0),
 75        text::BufferId::new(1).unwrap(),
 76        "".to_string(),
 77    );
 78    let operations = vec![
 79        buffer_a.edit([(0..0, "hello world")]),
 80        buffer_a.edit([(5..5, ", cruel")]),
 81        buffer_a.edit([(0..5, "goodbye")]),
 82        buffer_a.undo().unwrap().1,
 83    ];
 84    assert_eq!(buffer_a.text(), "hello, cruel world");
 85
 86    let operations = operations
 87        .into_iter()
 88        .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
 89        .collect::<Vec<_>>();
 90
 91    db.update_channel_buffer(zed_id, a_id, &operations)
 92        .await
 93        .unwrap();
 94
 95    let connection_id_b = ConnectionId { owner_id, id: 2 };
 96    let buffer_response_b = db
 97        .join_channel_buffer(zed_id, b_id, connection_id_b)
 98        .await
 99        .unwrap();
100
101    let mut buffer_b = Buffer::new(
102        ReplicaId::new(0),
103        text::BufferId::new(1).unwrap(),
104        buffer_response_b.base_text,
105    );
106    buffer_b.apply_ops(buffer_response_b.operations.into_iter().map(|operation| {
107        let operation = proto::deserialize_operation(operation).unwrap();
108        if let language::Operation::Buffer(operation) = operation {
109            operation
110        } else {
111            unreachable!()
112        }
113    }));
114
115    assert_eq!(buffer_b.text(), "hello, cruel world");
116
117    // Ensure that C fails to open the buffer
118    assert!(
119        db.join_channel_buffer(zed_id, c_id, ConnectionId { owner_id, id: 3 })
120            .await
121            .is_err()
122    );
123
124    // Ensure that both collaborators have shown up
125    assert_eq!(
126        buffer_response_b.collaborators,
127        &[
128            rpc::proto::Collaborator {
129                user_id: a_id.to_proto(),
130                peer_id: Some(rpc::proto::PeerId { id: 1, owner_id }),
131                replica_id: ReplicaId::FIRST_COLLAB_ID.as_u16() as u32,
132                is_host: false,
133                committer_name: None,
134                committer_email: None,
135            },
136            rpc::proto::Collaborator {
137                user_id: b_id.to_proto(),
138                peer_id: Some(rpc::proto::PeerId { id: 2, owner_id }),
139                replica_id: ReplicaId::FIRST_COLLAB_ID.as_u16() as u32 + 1,
140                is_host: false,
141                committer_name: None,
142                committer_email: None,
143            }
144        ]
145    );
146
147    // Ensure that get_channel_buffer_collaborators works
148    let zed_collaborats = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
149    assert_eq!(zed_collaborats, &[a_id, b_id]);
150
151    let left_buffer = db
152        .leave_channel_buffer(zed_id, connection_id_b)
153        .await
154        .unwrap();
155
156    assert_eq!(left_buffer.connections, &[connection_id_a],);
157
158    let cargo_id = db.create_root_channel("cargo", a_id).await.unwrap();
159    let _ = db
160        .join_channel_buffer(cargo_id, a_id, connection_id_a)
161        .await
162        .unwrap();
163
164    db.leave_channel_buffers(connection_id_a).await.unwrap();
165
166    let zed_collaborators = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
167    let cargo_collaborators = db.get_channel_buffer_collaborators(cargo_id).await.unwrap();
168    assert_eq!(zed_collaborators, &[]);
169    assert_eq!(cargo_collaborators, &[]);
170
171    // When everyone has left the channel, the operations are collapsed into
172    // a new base text.
173    let buffer_response_b = db
174        .join_channel_buffer(zed_id, b_id, connection_id_b)
175        .await
176        .unwrap();
177    assert_eq!(buffer_response_b.base_text, "hello, cruel world");
178    assert_eq!(buffer_response_b.operations, &[]);
179}
180
181test_both_dbs!(
182    test_channel_buffers_last_operations,
183    test_channel_buffers_last_operations_postgres,
184    test_channel_buffers_last_operations_sqlite
185);
186
187async fn test_channel_buffers_last_operations(db: &Database) {
188    let user_id = db
189        .create_user(
190            "user_a@example.com",
191            None,
192            false,
193            NewUserParams {
194                github_login: "user_a".into(),
195                github_user_id: 101,
196            },
197        )
198        .await
199        .unwrap()
200        .user_id;
201    let observer_id = db
202        .create_user(
203            "user_b@example.com",
204            None,
205            false,
206            NewUserParams {
207                github_login: "user_b".into(),
208                github_user_id: 102,
209            },
210        )
211        .await
212        .unwrap()
213        .user_id;
214    let owner_id = db.create_server("production").await.unwrap().0 as u32;
215    let connection_id = ConnectionId {
216        owner_id,
217        id: user_id.0 as u32,
218    };
219
220    let mut buffers = Vec::new();
221    let mut text_buffers = Vec::new();
222    for i in 0..3 {
223        let channel = db
224            .create_root_channel(&format!("channel-{i}"), user_id)
225            .await
226            .unwrap();
227
228        db.invite_channel_member(channel, observer_id, user_id, ChannelRole::Member)
229            .await
230            .unwrap();
231        db.respond_to_channel_invite(channel, observer_id, true)
232            .await
233            .unwrap();
234
235        let res = db
236            .join_channel_buffer(channel, user_id, connection_id)
237            .await
238            .unwrap();
239
240        buffers.push(
241            db.transaction(|tx| async move { db.get_channel_buffer(channel, &tx).await })
242                .await
243                .unwrap(),
244        );
245
246        text_buffers.push(Buffer::new(
247            ReplicaId::new(res.replica_id as u16),
248            text::BufferId::new(1).unwrap(),
249            "".to_string(),
250        ));
251    }
252
253    update_buffer(
254        buffers[0].channel_id,
255        user_id,
256        db,
257        vec![
258            text_buffers[0].edit([(0..0, "a")]),
259            text_buffers[0].edit([(0..0, "b")]),
260            text_buffers[0].edit([(0..0, "c")]),
261        ],
262    )
263    .await;
264
265    update_buffer(
266        buffers[1].channel_id,
267        user_id,
268        db,
269        vec![
270            text_buffers[1].edit([(0..0, "d")]),
271            text_buffers[1].edit([(1..1, "e")]),
272            text_buffers[1].edit([(2..2, "f")]),
273        ],
274    )
275    .await;
276
277    // cause buffer 1's epoch to increment.
278    db.leave_channel_buffer(buffers[1].channel_id, connection_id)
279        .await
280        .unwrap();
281    db.join_channel_buffer(buffers[1].channel_id, user_id, connection_id)
282        .await
283        .unwrap();
284    let replica_id = text_buffers[1].replica_id();
285    text_buffers[1] = Buffer::new(
286        replica_id,
287        text::BufferId::new(1).unwrap(),
288        "def".to_string(),
289    );
290    update_buffer(
291        buffers[1].channel_id,
292        user_id,
293        db,
294        vec![
295            text_buffers[1].edit([(0..0, "g")]),
296            text_buffers[1].edit([(0..0, "h")]),
297        ],
298    )
299    .await;
300
301    update_buffer(
302        buffers[2].channel_id,
303        user_id,
304        db,
305        vec![text_buffers[2].edit([(0..0, "i")])],
306    )
307    .await;
308
309    let channels_for_user = db.get_channels_for_user(user_id).await.unwrap();
310
311    pretty_assertions::assert_eq!(
312        channels_for_user.latest_buffer_versions,
313        [
314            rpc::proto::ChannelBufferVersion {
315                channel_id: buffers[0].channel_id.to_proto(),
316                epoch: 0,
317                version: serialize_version(&text_buffers[0].version())
318                    .into_iter()
319                    .filter(
320                        |vector| vector.replica_id == text_buffers[0].replica_id().as_u16() as u32
321                    )
322                    .collect::<Vec<_>>(),
323            },
324            rpc::proto::ChannelBufferVersion {
325                channel_id: buffers[1].channel_id.to_proto(),
326                epoch: 1,
327                version: serialize_version(&text_buffers[1].version())
328                    .into_iter()
329                    .filter(
330                        |vector| vector.replica_id == text_buffers[1].replica_id().as_u16() as u32
331                    )
332                    .collect::<Vec<_>>(),
333            },
334            rpc::proto::ChannelBufferVersion {
335                channel_id: buffers[2].channel_id.to_proto(),
336                epoch: 0,
337                version: serialize_version(&text_buffers[2].version())
338                    .into_iter()
339                    .filter(
340                        |vector| vector.replica_id == text_buffers[2].replica_id().as_u16() as u32
341                    )
342                    .collect::<Vec<_>>(),
343            },
344        ]
345    );
346}
347
348async fn update_buffer(
349    channel_id: ChannelId,
350    user_id: UserId,
351    db: &Database,
352    operations: Vec<text::Operation>,
353) {
354    let operations = operations
355        .into_iter()
356        .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
357        .collect::<Vec<_>>();
358    db.update_channel_buffer(channel_id, user_id, &operations)
359        .await
360        .unwrap();
361}