1use super::*;
2use crate::test_both_dbs;
3use language::proto;
4use text::Buffer;
5
6test_both_dbs!(
7 test_channel_buffers,
8 test_channel_buffers_postgres,
9 test_channel_buffers_sqlite
10);
11
12async fn test_channel_buffers(db: &Arc<Database>) {
13 let a_id = db
14 .create_user(
15 "user_a@example.com",
16 false,
17 NewUserParams {
18 github_login: "user_a".into(),
19 github_user_id: 101,
20 invite_count: 0,
21 },
22 )
23 .await
24 .unwrap()
25 .user_id;
26 let b_id = db
27 .create_user(
28 "user_b@example.com",
29 false,
30 NewUserParams {
31 github_login: "user_b".into(),
32 github_user_id: 102,
33 invite_count: 0,
34 },
35 )
36 .await
37 .unwrap()
38 .user_id;
39
40 // This user will not be a part of the channel
41 let c_id = db
42 .create_user(
43 "user_c@example.com",
44 false,
45 NewUserParams {
46 github_login: "user_c".into(),
47 github_user_id: 102,
48 invite_count: 0,
49 },
50 )
51 .await
52 .unwrap()
53 .user_id;
54
55 let owner_id = db.create_server("production").await.unwrap().0 as u32;
56
57 let zed_id = db.create_root_channel("zed", "1", a_id).await.unwrap();
58
59 db.invite_channel_member(zed_id, b_id, a_id, false)
60 .await
61 .unwrap();
62
63 db.respond_to_channel_invite(zed_id, b_id, true)
64 .await
65 .unwrap();
66
67 let connection_id_a = ConnectionId { owner_id, id: 1 };
68 let _ = db
69 .join_channel_buffer(zed_id, a_id, connection_id_a)
70 .await
71 .unwrap();
72
73 let mut buffer_a = Buffer::new(0, 0, "".to_string());
74 let mut operations = Vec::new();
75 operations.push(buffer_a.edit([(0..0, "hello world")]));
76 operations.push(buffer_a.edit([(5..5, ", cruel")]));
77 operations.push(buffer_a.edit([(0..5, "goodbye")]));
78 operations.push(buffer_a.undo().unwrap().1);
79 assert_eq!(buffer_a.text(), "hello, cruel world");
80
81 let operations = operations
82 .into_iter()
83 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
84 .collect::<Vec<_>>();
85
86 db.update_channel_buffer(zed_id, a_id, &operations)
87 .await
88 .unwrap();
89
90 let connection_id_b = ConnectionId { owner_id, id: 2 };
91 let buffer_response_b = db
92 .join_channel_buffer(zed_id, b_id, connection_id_b)
93 .await
94 .unwrap();
95
96 let mut buffer_b = Buffer::new(0, 0, buffer_response_b.base_text);
97 buffer_b
98 .apply_ops(buffer_response_b.operations.into_iter().map(|operation| {
99 let operation = proto::deserialize_operation(operation).unwrap();
100 if let language::Operation::Buffer(operation) = operation {
101 operation
102 } else {
103 unreachable!()
104 }
105 }))
106 .unwrap();
107
108 assert_eq!(buffer_b.text(), "hello, cruel world");
109
110 // Ensure that C fails to open the buffer
111 assert!(db
112 .join_channel_buffer(zed_id, c_id, ConnectionId { owner_id, id: 3 })
113 .await
114 .is_err());
115
116 // Ensure that both collaborators have shown up
117 assert_eq!(
118 buffer_response_b.collaborators,
119 &[
120 rpc::proto::Collaborator {
121 user_id: a_id.to_proto(),
122 peer_id: Some(rpc::proto::PeerId { id: 1, owner_id }),
123 replica_id: 0,
124 },
125 rpc::proto::Collaborator {
126 user_id: b_id.to_proto(),
127 peer_id: Some(rpc::proto::PeerId { id: 2, owner_id }),
128 replica_id: 1,
129 }
130 ]
131 );
132
133 // Ensure that get_channel_buffer_collaborators works
134 let zed_collaborats = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
135 assert_eq!(zed_collaborats, &[a_id, b_id]);
136
137 let left_buffer = db
138 .leave_channel_buffer(zed_id, connection_id_b)
139 .await
140 .unwrap();
141
142 assert_eq!(left_buffer.connections, &[connection_id_a],);
143
144 let cargo_id = db.create_root_channel("cargo", "2", a_id).await.unwrap();
145 let _ = db
146 .join_channel_buffer(cargo_id, a_id, connection_id_a)
147 .await
148 .unwrap();
149
150 db.leave_channel_buffers(connection_id_a).await.unwrap();
151
152 let zed_collaborators = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
153 let cargo_collaborators = db.get_channel_buffer_collaborators(cargo_id).await.unwrap();
154 assert_eq!(zed_collaborators, &[]);
155 assert_eq!(cargo_collaborators, &[]);
156
157 // When everyone has left the channel, the operations are collapsed into
158 // a new base text.
159 let buffer_response_b = db
160 .join_channel_buffer(zed_id, b_id, connection_id_b)
161 .await
162 .unwrap();
163 assert_eq!(buffer_response_b.base_text, "hello, cruel world");
164 assert_eq!(buffer_response_b.operations, &[]);
165}
166
167test_both_dbs!(
168 test_channel_buffers_diffs,
169 test_channel_buffers_diffs_postgres,
170 test_channel_buffers_diffs_sqlite
171);
172
173async fn test_channel_buffers_diffs(db: &Database) {
174 let a_id = db
175 .create_user(
176 "user_a@example.com",
177 false,
178 NewUserParams {
179 github_login: "user_a".into(),
180 github_user_id: 101,
181 invite_count: 0,
182 },
183 )
184 .await
185 .unwrap()
186 .user_id;
187 let b_id = db
188 .create_user(
189 "user_b@example.com",
190 false,
191 NewUserParams {
192 github_login: "user_b".into(),
193 github_user_id: 102,
194 invite_count: 0,
195 },
196 )
197 .await
198 .unwrap()
199 .user_id;
200
201 let owner_id = db.create_server("production").await.unwrap().0 as u32;
202
203 let zed_id = db.create_root_channel("zed", "1", a_id).await.unwrap();
204
205 db.invite_channel_member(zed_id, b_id, a_id, false)
206 .await
207 .unwrap();
208
209 db.respond_to_channel_invite(zed_id, b_id, true)
210 .await
211 .unwrap();
212
213 let connection_id_a = ConnectionId {
214 owner_id,
215 id: a_id.0 as u32,
216 };
217 let connection_id_b = ConnectionId {
218 owner_id,
219 id: b_id.0 as u32,
220 };
221
222 // Zero test: A should not register as changed on an unitialized channel buffer
223 assert!(!db.test_has_note_changed(a_id, zed_id).await.unwrap());
224
225 let _ = db
226 .join_channel_buffer(zed_id, a_id, connection_id_a)
227 .await
228 .unwrap();
229
230 // Zero test: A should register as changed on an empty channel buffer
231 assert!(!db.test_has_note_changed(a_id, zed_id).await.unwrap());
232
233 let mut buffer_a = Buffer::new(0, 0, "".to_string());
234 let mut operations = Vec::new();
235 operations.push(buffer_a.edit([(0..0, "hello world")]));
236 assert_eq!(buffer_a.text(), "hello world");
237
238 let operations = operations
239 .into_iter()
240 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
241 .collect::<Vec<_>>();
242
243 db.update_channel_buffer(zed_id, a_id, &operations)
244 .await
245 .unwrap();
246
247 // Smoke test: Does B register as changed, A as unchanged?
248 assert!(db.test_has_note_changed(b_id, zed_id).await.unwrap());
249
250 assert!(!db.test_has_note_changed(a_id, zed_id).await.unwrap());
251
252 db.leave_channel_buffer(zed_id, connection_id_a)
253 .await
254 .unwrap();
255
256 // Snapshotting from leaving the channel buffer should not have a diff
257 assert!(!db.test_has_note_changed(a_id, zed_id).await.unwrap());
258
259 let _ = db
260 .join_channel_buffer(zed_id, b_id, connection_id_b)
261 .await
262 .unwrap();
263
264 // B has opened the channel buffer, so we shouldn't have any diff
265 assert!(!db.test_has_note_changed(b_id, zed_id).await.unwrap());
266
267 db.leave_channel_buffer(zed_id, connection_id_b)
268 .await
269 .unwrap();
270
271 // Since B just opened and closed the buffer without editing, neither should have a diff
272 assert!(!db.test_has_note_changed(a_id, zed_id).await.unwrap());
273 assert!(!db.test_has_note_changed(b_id, zed_id).await.unwrap());
274}
275
276test_both_dbs!(
277 test_channel_buffers_last_operations,
278 test_channel_buffers_last_operations_postgres,
279 test_channel_buffers_last_operations_sqlite
280);
281
282async fn test_channel_buffers_last_operations(db: &Database) {
283 let user_id = db
284 .create_user(
285 "user_a@example.com",
286 false,
287 NewUserParams {
288 github_login: "user_a".into(),
289 github_user_id: 101,
290 invite_count: 0,
291 },
292 )
293 .await
294 .unwrap()
295 .user_id;
296 let owner_id = db.create_server("production").await.unwrap().0 as u32;
297 let connection_id = ConnectionId {
298 owner_id,
299 id: user_id.0 as u32,
300 };
301
302 let mut buffers = Vec::new();
303 let mut text_buffers = Vec::new();
304 for i in 0..3 {
305 let channel = db
306 .create_root_channel(&format!("channel-{i}"), &format!("room-{i}"), user_id)
307 .await
308 .unwrap();
309
310 db.join_channel_buffer(channel, user_id, connection_id)
311 .await
312 .unwrap();
313
314 buffers.push(
315 db.transaction(|tx| async move { db.get_channel_buffer(channel, &*tx).await })
316 .await
317 .unwrap(),
318 );
319
320 text_buffers.push(Buffer::new(0, 0, "".to_string()));
321 }
322
323 let operations = db
324 .transaction(|tx| {
325 let buffers = &buffers;
326 async move {
327 db.get_last_operations_for_buffers([buffers[0].id, buffers[2].id], &*tx)
328 .await
329 }
330 })
331 .await
332 .unwrap();
333
334 assert!(operations.is_empty());
335
336 update_buffer(
337 buffers[0].channel_id,
338 user_id,
339 db,
340 vec![
341 text_buffers[0].edit([(0..0, "a")]),
342 text_buffers[0].edit([(0..0, "b")]),
343 text_buffers[0].edit([(0..0, "c")]),
344 ],
345 )
346 .await;
347
348 update_buffer(
349 buffers[1].channel_id,
350 user_id,
351 db,
352 vec![
353 text_buffers[1].edit([(0..0, "d")]),
354 text_buffers[1].edit([(1..1, "e")]),
355 text_buffers[1].edit([(2..2, "f")]),
356 ],
357 )
358 .await;
359
360 // cause buffer 1's epoch to increment.
361 db.leave_channel_buffer(buffers[1].channel_id, connection_id)
362 .await
363 .unwrap();
364 db.join_channel_buffer(buffers[1].channel_id, user_id, connection_id)
365 .await
366 .unwrap();
367 text_buffers[1] = Buffer::new(1, 0, "def".to_string());
368 update_buffer(
369 buffers[1].channel_id,
370 user_id,
371 db,
372 vec![
373 text_buffers[1].edit([(0..0, "g")]),
374 text_buffers[1].edit([(0..0, "h")]),
375 ],
376 )
377 .await;
378
379 update_buffer(
380 buffers[2].channel_id,
381 user_id,
382 db,
383 vec![text_buffers[2].edit([(0..0, "i")])],
384 )
385 .await;
386
387 let operations = db
388 .transaction(|tx| {
389 let buffers = &buffers;
390 async move {
391 db.get_last_operations_for_buffers([buffers[1].id, buffers[2].id], &*tx)
392 .await
393 }
394 })
395 .await
396 .unwrap();
397 assert_operations(
398 &operations,
399 &[
400 (buffers[1].id, 1, &text_buffers[1]),
401 (buffers[2].id, 0, &text_buffers[2]),
402 ],
403 );
404
405 let operations = db
406 .transaction(|tx| {
407 let buffers = &buffers;
408 async move {
409 db.get_last_operations_for_buffers([buffers[0].id, buffers[1].id], &*tx)
410 .await
411 }
412 })
413 .await
414 .unwrap();
415 assert_operations(
416 &operations,
417 &[
418 (buffers[0].id, 0, &text_buffers[0]),
419 (buffers[1].id, 1, &text_buffers[1]),
420 ],
421 );
422
423 async fn update_buffer(
424 channel_id: ChannelId,
425 user_id: UserId,
426 db: &Database,
427 operations: Vec<text::Operation>,
428 ) {
429 let operations = operations
430 .into_iter()
431 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
432 .collect::<Vec<_>>();
433 db.update_channel_buffer(channel_id, user_id, &operations)
434 .await
435 .unwrap();
436 }
437
438 fn assert_operations(
439 operations: &[buffer_operation::Model],
440 expected: &[(BufferId, i32, &text::Buffer)],
441 ) {
442 let actual = operations
443 .iter()
444 .map(|op| buffer_operation::Model {
445 buffer_id: op.buffer_id,
446 epoch: op.epoch,
447 lamport_timestamp: op.lamport_timestamp,
448 replica_id: op.replica_id,
449 value: vec![],
450 })
451 .collect::<Vec<_>>();
452 let expected = expected
453 .iter()
454 .map(|(buffer_id, epoch, buffer)| buffer_operation::Model {
455 buffer_id: *buffer_id,
456 epoch: *epoch,
457 lamport_timestamp: buffer.lamport_clock.value as i32 - 1,
458 replica_id: buffer.replica_id() as i32,
459 value: vec![],
460 })
461 .collect::<Vec<_>>();
462 assert_eq!(actual, expected, "unexpected operations")
463 }
464}