1use super::*;
  2use crate::test_both_dbs;
  3use language::proto::{self, serialize_version};
  4use text::{Buffer, ReplicaId};
  5
  6test_both_dbs!(
  7    test_channel_buffers,
  8    test_channel_buffers_postgres,
  9    test_channel_buffers_sqlite
 10);
 11
 12async fn test_channel_buffers(db: &Arc<Database>) {
 13    let a_id = db
 14        .create_user(
 15            "user_a@example.com",
 16            None,
 17            false,
 18            NewUserParams {
 19                github_login: "user_a".into(),
 20                github_user_id: 101,
 21            },
 22        )
 23        .await
 24        .unwrap()
 25        .user_id;
 26    let b_id = db
 27        .create_user(
 28            "user_b@example.com",
 29            None,
 30            false,
 31            NewUserParams {
 32                github_login: "user_b".into(),
 33                github_user_id: 102,
 34            },
 35        )
 36        .await
 37        .unwrap()
 38        .user_id;
 39
 40    // This user will not be a part of the channel
 41    let c_id = db
 42        .create_user(
 43            "user_c@example.com",
 44            None,
 45            false,
 46            NewUserParams {
 47                github_login: "user_c".into(),
 48                github_user_id: 103,
 49            },
 50        )
 51        .await
 52        .unwrap()
 53        .user_id;
 54
 55    let owner_id = db.create_server("production").await.unwrap().0 as u32;
 56
 57    let zed_id = db.create_root_channel("zed", a_id).await.unwrap();
 58
 59    db.invite_channel_member(zed_id, b_id, a_id, ChannelRole::Member)
 60        .await
 61        .unwrap();
 62
 63    db.respond_to_channel_invite(zed_id, b_id, true)
 64        .await
 65        .unwrap();
 66
 67    let connection_id_a = ConnectionId { owner_id, id: 1 };
 68    let _ = db
 69        .join_channel_buffer(zed_id, a_id, connection_id_a)
 70        .await
 71        .unwrap();
 72
 73    let mut buffer_a = Buffer::new(
 74        ReplicaId::new(0),
 75        text::BufferId::new(1).unwrap(),
 76        "".to_string(),
 77        &db.test_options.as_ref().unwrap().executor,
 78    );
 79    let operations = vec![
 80        buffer_a.edit(
 81            [(0..0, "hello world")],
 82            &db.test_options.as_ref().unwrap().executor,
 83        ),
 84        buffer_a.edit(
 85            [(5..5, ", cruel")],
 86            &db.test_options.as_ref().unwrap().executor,
 87        ),
 88        buffer_a.edit(
 89            [(0..5, "goodbye")],
 90            &db.test_options.as_ref().unwrap().executor,
 91        ),
 92        buffer_a.undo().unwrap().1,
 93    ];
 94    assert_eq!(buffer_a.text(), "hello, cruel world");
 95
 96    let operations = operations
 97        .into_iter()
 98        .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
 99        .collect::<Vec<_>>();
100
101    db.update_channel_buffer(zed_id, a_id, &operations)
102        .await
103        .unwrap();
104
105    let connection_id_b = ConnectionId { owner_id, id: 2 };
106    let buffer_response_b = db
107        .join_channel_buffer(zed_id, b_id, connection_id_b)
108        .await
109        .unwrap();
110
111    let mut buffer_b = Buffer::new(
112        ReplicaId::new(0),
113        text::BufferId::new(1).unwrap(),
114        buffer_response_b.base_text,
115        &db.test_options.as_ref().unwrap().executor,
116    );
117    buffer_b.apply_ops(
118        buffer_response_b.operations.into_iter().map(|operation| {
119            let operation = proto::deserialize_operation(operation).unwrap();
120            if let language::Operation::Buffer(operation) = operation {
121                operation
122            } else {
123                unreachable!()
124            }
125        }),
126        None,
127    );
128
129    assert_eq!(buffer_b.text(), "hello, cruel world");
130
131    // Ensure that C fails to open the buffer
132    assert!(
133        db.join_channel_buffer(zed_id, c_id, ConnectionId { owner_id, id: 3 })
134            .await
135            .is_err()
136    );
137
138    // Ensure that both collaborators have shown up
139    assert_eq!(
140        buffer_response_b.collaborators,
141        &[
142            rpc::proto::Collaborator {
143                user_id: a_id.to_proto(),
144                peer_id: Some(rpc::proto::PeerId { id: 1, owner_id }),
145                replica_id: ReplicaId::FIRST_COLLAB_ID.as_u16() as u32,
146                is_host: false,
147                committer_name: None,
148                committer_email: None,
149            },
150            rpc::proto::Collaborator {
151                user_id: b_id.to_proto(),
152                peer_id: Some(rpc::proto::PeerId { id: 2, owner_id }),
153                replica_id: ReplicaId::FIRST_COLLAB_ID.as_u16() as u32 + 1,
154                is_host: false,
155                committer_name: None,
156                committer_email: None,
157            }
158        ]
159    );
160
161    // Ensure that get_channel_buffer_collaborators works
162    let zed_collaborats = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
163    assert_eq!(zed_collaborats, &[a_id, b_id]);
164
165    let left_buffer = db
166        .leave_channel_buffer(zed_id, connection_id_b)
167        .await
168        .unwrap();
169
170    assert_eq!(left_buffer.connections, &[connection_id_a],);
171
172    let cargo_id = db.create_root_channel("cargo", a_id).await.unwrap();
173    let _ = db
174        .join_channel_buffer(cargo_id, a_id, connection_id_a)
175        .await
176        .unwrap();
177
178    db.leave_channel_buffers(connection_id_a).await.unwrap();
179
180    let zed_collaborators = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
181    let cargo_collaborators = db.get_channel_buffer_collaborators(cargo_id).await.unwrap();
182    assert_eq!(zed_collaborators, &[]);
183    assert_eq!(cargo_collaborators, &[]);
184
185    // When everyone has left the channel, the operations are collapsed into
186    // a new base text.
187    let buffer_response_b = db
188        .join_channel_buffer(zed_id, b_id, connection_id_b)
189        .await
190        .unwrap();
191    assert_eq!(buffer_response_b.base_text, "hello, cruel world");
192    assert_eq!(buffer_response_b.operations, &[]);
193}
194
195test_both_dbs!(
196    test_channel_buffers_last_operations,
197    test_channel_buffers_last_operations_postgres,
198    test_channel_buffers_last_operations_sqlite
199);
200
201async fn test_channel_buffers_last_operations(db: &Database) {
202    let user_id = db
203        .create_user(
204            "user_a@example.com",
205            None,
206            false,
207            NewUserParams {
208                github_login: "user_a".into(),
209                github_user_id: 101,
210            },
211        )
212        .await
213        .unwrap()
214        .user_id;
215    let observer_id = db
216        .create_user(
217            "user_b@example.com",
218            None,
219            false,
220            NewUserParams {
221                github_login: "user_b".into(),
222                github_user_id: 102,
223            },
224        )
225        .await
226        .unwrap()
227        .user_id;
228    let owner_id = db.create_server("production").await.unwrap().0 as u32;
229    let connection_id = ConnectionId {
230        owner_id,
231        id: user_id.0 as u32,
232    };
233
234    let mut buffers = Vec::new();
235    let mut text_buffers = Vec::new();
236    for i in 0..3 {
237        let channel = db
238            .create_root_channel(&format!("channel-{i}"), user_id)
239            .await
240            .unwrap();
241
242        db.invite_channel_member(channel, observer_id, user_id, ChannelRole::Member)
243            .await
244            .unwrap();
245        db.respond_to_channel_invite(channel, observer_id, true)
246            .await
247            .unwrap();
248
249        let res = db
250            .join_channel_buffer(channel, user_id, connection_id)
251            .await
252            .unwrap();
253
254        buffers.push(
255            db.transaction(|tx| async move { db.get_channel_buffer(channel, &tx).await })
256                .await
257                .unwrap(),
258        );
259
260        text_buffers.push(Buffer::new(
261            ReplicaId::new(res.replica_id as u16),
262            text::BufferId::new(1).unwrap(),
263            "".to_string(),
264            &db.test_options.as_ref().unwrap().executor,
265        ));
266    }
267
268    update_buffer(
269        buffers[0].channel_id,
270        user_id,
271        db,
272        vec![
273            text_buffers[0].edit([(0..0, "a")], &db.test_options.as_ref().unwrap().executor),
274            text_buffers[0].edit([(0..0, "b")], &db.test_options.as_ref().unwrap().executor),
275            text_buffers[0].edit([(0..0, "c")], &db.test_options.as_ref().unwrap().executor),
276        ],
277    )
278    .await;
279
280    update_buffer(
281        buffers[1].channel_id,
282        user_id,
283        db,
284        vec![
285            text_buffers[1].edit([(0..0, "d")], &db.test_options.as_ref().unwrap().executor),
286            text_buffers[1].edit([(1..1, "e")], &db.test_options.as_ref().unwrap().executor),
287            text_buffers[1].edit([(2..2, "f")], &db.test_options.as_ref().unwrap().executor),
288        ],
289    )
290    .await;
291
292    // cause buffer 1's epoch to increment.
293    db.leave_channel_buffer(buffers[1].channel_id, connection_id)
294        .await
295        .unwrap();
296    db.join_channel_buffer(buffers[1].channel_id, user_id, connection_id)
297        .await
298        .unwrap();
299    let replica_id = text_buffers[1].replica_id();
300    text_buffers[1] = Buffer::new(
301        replica_id,
302        text::BufferId::new(1).unwrap(),
303        "def".to_string(),
304        &db.test_options.as_ref().unwrap().executor,
305    );
306    update_buffer(
307        buffers[1].channel_id,
308        user_id,
309        db,
310        vec![
311            text_buffers[1].edit([(0..0, "g")], &db.test_options.as_ref().unwrap().executor),
312            text_buffers[1].edit([(0..0, "h")], &db.test_options.as_ref().unwrap().executor),
313        ],
314    )
315    .await;
316
317    update_buffer(
318        buffers[2].channel_id,
319        user_id,
320        db,
321        vec![text_buffers[2].edit([(0..0, "i")], &db.test_options.as_ref().unwrap().executor)],
322    )
323    .await;
324
325    let channels_for_user = db.get_channels_for_user(user_id).await.unwrap();
326
327    pretty_assertions::assert_eq!(
328        channels_for_user.latest_buffer_versions,
329        [
330            rpc::proto::ChannelBufferVersion {
331                channel_id: buffers[0].channel_id.to_proto(),
332                epoch: 0,
333                version: serialize_version(&text_buffers[0].version())
334                    .into_iter()
335                    .filter(
336                        |vector| vector.replica_id == text_buffers[0].replica_id().as_u16() as u32
337                    )
338                    .collect::<Vec<_>>(),
339            },
340            rpc::proto::ChannelBufferVersion {
341                channel_id: buffers[1].channel_id.to_proto(),
342                epoch: 1,
343                version: serialize_version(&text_buffers[1].version())
344                    .into_iter()
345                    .filter(
346                        |vector| vector.replica_id == text_buffers[1].replica_id().as_u16() as u32
347                    )
348                    .collect::<Vec<_>>(),
349            },
350            rpc::proto::ChannelBufferVersion {
351                channel_id: buffers[2].channel_id.to_proto(),
352                epoch: 0,
353                version: serialize_version(&text_buffers[2].version())
354                    .into_iter()
355                    .filter(
356                        |vector| vector.replica_id == text_buffers[2].replica_id().as_u16() as u32
357                    )
358                    .collect::<Vec<_>>(),
359            },
360        ]
361    );
362}
363
364async fn update_buffer(
365    channel_id: ChannelId,
366    user_id: UserId,
367    db: &Database,
368    operations: Vec<text::Operation>,
369) {
370    let operations = operations
371        .into_iter()
372        .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
373        .collect::<Vec<_>>();
374    db.update_channel_buffer(channel_id, user_id, &operations)
375        .await
376        .unwrap();
377}