buffer_tests.rs

  1use super::*;
  2use crate::test_both_dbs;
  3use language::proto::{self, serialize_version};
  4use rpc::ConnectionId;
  5use text::{Buffer, ReplicaId};
  6
  7test_both_dbs!(
  8    test_channel_buffers,
  9    test_channel_buffers_postgres,
 10    test_channel_buffers_sqlite
 11);
 12
 13async fn test_channel_buffers(db: &Arc<Database>) {
 14    let a_id = db
 15        .create_user(
 16            "user_a@example.com",
 17            None,
 18            false,
 19            NewUserParams {
 20                github_login: "user_a".into(),
 21                github_user_id: 101,
 22            },
 23        )
 24        .await
 25        .unwrap()
 26        .user_id;
 27    let b_id = db
 28        .create_user(
 29            "user_b@example.com",
 30            None,
 31            false,
 32            NewUserParams {
 33                github_login: "user_b".into(),
 34                github_user_id: 102,
 35            },
 36        )
 37        .await
 38        .unwrap()
 39        .user_id;
 40
 41    // This user will not be a part of the channel
 42    let c_id = db
 43        .create_user(
 44            "user_c@example.com",
 45            None,
 46            false,
 47            NewUserParams {
 48                github_login: "user_c".into(),
 49                github_user_id: 103,
 50            },
 51        )
 52        .await
 53        .unwrap()
 54        .user_id;
 55
 56    let owner_id = db.create_server("production").await.unwrap().0 as u32;
 57
 58    let zed_id = db.create_root_channel("zed", a_id).await.unwrap();
 59
 60    db.invite_channel_member(zed_id, b_id, a_id, ChannelRole::Member)
 61        .await
 62        .unwrap();
 63
 64    db.respond_to_channel_invite(zed_id, b_id, true)
 65        .await
 66        .unwrap();
 67
 68    let connection_id_a = ConnectionId { owner_id, id: 1 };
 69    let _ = db
 70        .join_channel_buffer(zed_id, a_id, connection_id_a)
 71        .await
 72        .unwrap();
 73
 74    let mut buffer_a = Buffer::new(
 75        ReplicaId::new(0),
 76        text::BufferId::new(1).unwrap(),
 77        "".to_string(),
 78    );
 79    let operations = vec![
 80        buffer_a.edit([(0..0, "hello world")]),
 81        buffer_a.edit([(5..5, ", cruel")]),
 82        buffer_a.edit([(0..5, "goodbye")]),
 83        buffer_a.undo().unwrap().1,
 84    ];
 85    assert_eq!(buffer_a.text(), "hello, cruel world");
 86
 87    let operations = operations
 88        .into_iter()
 89        .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
 90        .collect::<Vec<_>>();
 91
 92    db.update_channel_buffer(zed_id, a_id, &operations)
 93        .await
 94        .unwrap();
 95
 96    let connection_id_b = ConnectionId { owner_id, id: 2 };
 97    let buffer_response_b = db
 98        .join_channel_buffer(zed_id, b_id, connection_id_b)
 99        .await
100        .unwrap();
101
102    let mut buffer_b = Buffer::new(
103        ReplicaId::new(0),
104        text::BufferId::new(1).unwrap(),
105        buffer_response_b.base_text,
106    );
107    buffer_b.apply_ops(buffer_response_b.operations.into_iter().map(|operation| {
108        let operation = proto::deserialize_operation(operation).unwrap();
109        if let language::Operation::Buffer(operation) = operation {
110            operation
111        } else {
112            unreachable!()
113        }
114    }));
115
116    assert_eq!(buffer_b.text(), "hello, cruel world");
117
118    // Ensure that C fails to open the buffer
119    assert!(
120        db.join_channel_buffer(zed_id, c_id, ConnectionId { owner_id, id: 3 })
121            .await
122            .is_err()
123    );
124
125    // Ensure that both collaborators have shown up
126    assert_eq!(
127        buffer_response_b.collaborators,
128        &[
129            rpc::proto::Collaborator {
130                user_id: a_id.to_proto(),
131                peer_id: Some(rpc::proto::PeerId { id: 1, owner_id }),
132                replica_id: ReplicaId::FIRST_COLLAB_ID.as_u16() as u32,
133                is_host: false,
134                committer_name: None,
135                committer_email: None,
136            },
137            rpc::proto::Collaborator {
138                user_id: b_id.to_proto(),
139                peer_id: Some(rpc::proto::PeerId { id: 2, owner_id }),
140                replica_id: ReplicaId::FIRST_COLLAB_ID.as_u16() as u32 + 1,
141                is_host: false,
142                committer_name: None,
143                committer_email: None,
144            }
145        ]
146    );
147
148    // Ensure that get_channel_buffer_collaborators works
149    let zed_collaborats = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
150    assert_eq!(zed_collaborats, &[a_id, b_id]);
151
152    let left_buffer = db
153        .leave_channel_buffer(zed_id, connection_id_b)
154        .await
155        .unwrap();
156
157    assert_eq!(left_buffer.connections, &[connection_id_a],);
158
159    let cargo_id = db.create_root_channel("cargo", a_id).await.unwrap();
160    let _ = db
161        .join_channel_buffer(cargo_id, a_id, connection_id_a)
162        .await
163        .unwrap();
164
165    db.leave_channel_buffers(connection_id_a).await.unwrap();
166
167    let zed_collaborators = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
168    let cargo_collaborators = db.get_channel_buffer_collaborators(cargo_id).await.unwrap();
169    assert_eq!(zed_collaborators, &[]);
170    assert_eq!(cargo_collaborators, &[]);
171
172    // When everyone has left the channel, the operations are collapsed into
173    // a new base text.
174    let buffer_response_b = db
175        .join_channel_buffer(zed_id, b_id, connection_id_b)
176        .await
177        .unwrap();
178    assert_eq!(buffer_response_b.base_text, "hello, cruel world");
179    assert_eq!(buffer_response_b.operations, &[]);
180}
181
182test_both_dbs!(
183    test_channel_buffers_last_operations,
184    test_channel_buffers_last_operations_postgres,
185    test_channel_buffers_last_operations_sqlite
186);
187
188async fn test_channel_buffers_last_operations(db: &Database) {
189    let user_id = db
190        .create_user(
191            "user_a@example.com",
192            None,
193            false,
194            NewUserParams {
195                github_login: "user_a".into(),
196                github_user_id: 101,
197            },
198        )
199        .await
200        .unwrap()
201        .user_id;
202    let observer_id = db
203        .create_user(
204            "user_b@example.com",
205            None,
206            false,
207            NewUserParams {
208                github_login: "user_b".into(),
209                github_user_id: 102,
210            },
211        )
212        .await
213        .unwrap()
214        .user_id;
215    let owner_id = db.create_server("production").await.unwrap().0 as u32;
216    let connection_id = ConnectionId {
217        owner_id,
218        id: user_id.0 as u32,
219    };
220
221    let mut buffers = Vec::new();
222    let mut text_buffers = Vec::new();
223    for i in 0..3 {
224        let channel = db
225            .create_root_channel(&format!("channel-{i}"), user_id)
226            .await
227            .unwrap();
228
229        db.invite_channel_member(channel, observer_id, user_id, ChannelRole::Member)
230            .await
231            .unwrap();
232        db.respond_to_channel_invite(channel, observer_id, true)
233            .await
234            .unwrap();
235
236        let res = db
237            .join_channel_buffer(channel, user_id, connection_id)
238            .await
239            .unwrap();
240
241        buffers.push(
242            db.transaction(|tx| async move { db.get_channel_buffer(channel, &tx).await })
243                .await
244                .unwrap(),
245        );
246
247        text_buffers.push(Buffer::new(
248            ReplicaId::new(res.replica_id as u16),
249            text::BufferId::new(1).unwrap(),
250            "".to_string(),
251        ));
252    }
253
254    update_buffer(
255        buffers[0].channel_id,
256        user_id,
257        db,
258        vec![
259            text_buffers[0].edit([(0..0, "a")]),
260            text_buffers[0].edit([(0..0, "b")]),
261            text_buffers[0].edit([(0..0, "c")]),
262        ],
263    )
264    .await;
265
266    update_buffer(
267        buffers[1].channel_id,
268        user_id,
269        db,
270        vec![
271            text_buffers[1].edit([(0..0, "d")]),
272            text_buffers[1].edit([(1..1, "e")]),
273            text_buffers[1].edit([(2..2, "f")]),
274        ],
275    )
276    .await;
277
278    // cause buffer 1's epoch to increment.
279    db.leave_channel_buffer(buffers[1].channel_id, connection_id)
280        .await
281        .unwrap();
282    db.join_channel_buffer(buffers[1].channel_id, user_id, connection_id)
283        .await
284        .unwrap();
285    let replica_id = text_buffers[1].replica_id();
286    text_buffers[1] = Buffer::new(
287        replica_id,
288        text::BufferId::new(1).unwrap(),
289        "def".to_string(),
290    );
291    update_buffer(
292        buffers[1].channel_id,
293        user_id,
294        db,
295        vec![
296            text_buffers[1].edit([(0..0, "g")]),
297            text_buffers[1].edit([(0..0, "h")]),
298        ],
299    )
300    .await;
301
302    update_buffer(
303        buffers[2].channel_id,
304        user_id,
305        db,
306        vec![text_buffers[2].edit([(0..0, "i")])],
307    )
308    .await;
309
310    let channels_for_user = db.get_channels_for_user(user_id).await.unwrap();
311
312    pretty_assertions::assert_eq!(
313        channels_for_user.latest_buffer_versions,
314        [
315            rpc::proto::ChannelBufferVersion {
316                channel_id: buffers[0].channel_id.to_proto(),
317                epoch: 0,
318                version: serialize_version(&text_buffers[0].version())
319                    .into_iter()
320                    .filter(
321                        |vector| vector.replica_id == text_buffers[0].replica_id().as_u16() as u32
322                    )
323                    .collect::<Vec<_>>(),
324            },
325            rpc::proto::ChannelBufferVersion {
326                channel_id: buffers[1].channel_id.to_proto(),
327                epoch: 1,
328                version: serialize_version(&text_buffers[1].version())
329                    .into_iter()
330                    .filter(
331                        |vector| vector.replica_id == text_buffers[1].replica_id().as_u16() as u32
332                    )
333                    .collect::<Vec<_>>(),
334            },
335            rpc::proto::ChannelBufferVersion {
336                channel_id: buffers[2].channel_id.to_proto(),
337                epoch: 0,
338                version: serialize_version(&text_buffers[2].version())
339                    .into_iter()
340                    .filter(
341                        |vector| vector.replica_id == text_buffers[2].replica_id().as_u16() as u32
342                    )
343                    .collect::<Vec<_>>(),
344            },
345        ]
346    );
347}
348
349async fn update_buffer(
350    channel_id: ChannelId,
351    user_id: UserId,
352    db: &Database,
353    operations: Vec<text::Operation>,
354) {
355    let operations = operations
356        .into_iter()
357        .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
358        .collect::<Vec<_>>();
359    db.update_channel_buffer(channel_id, user_id, &operations)
360        .await
361        .unwrap();
362}