1use super::*;
2use crate::test_both_dbs;
3use language::proto;
4use text::Buffer;
5
6test_both_dbs!(
7 test_channel_buffers,
8 test_channel_buffers_postgres,
9 test_channel_buffers_sqlite
10);
11
12async fn test_channel_buffers(db: &Arc<Database>) {
13 let a_id = db
14 .create_user(
15 "user_a@example.com",
16 false,
17 NewUserParams {
18 github_login: "user_a".into(),
19 github_user_id: 101,
20 invite_count: 0,
21 },
22 )
23 .await
24 .unwrap()
25 .user_id;
26 let b_id = db
27 .create_user(
28 "user_b@example.com",
29 false,
30 NewUserParams {
31 github_login: "user_b".into(),
32 github_user_id: 102,
33 invite_count: 0,
34 },
35 )
36 .await
37 .unwrap()
38 .user_id;
39
40 // This user will not be a part of the channel
41 let c_id = db
42 .create_user(
43 "user_c@example.com",
44 false,
45 NewUserParams {
46 github_login: "user_c".into(),
47 github_user_id: 102,
48 invite_count: 0,
49 },
50 )
51 .await
52 .unwrap()
53 .user_id;
54
55 let owner_id = db.create_server("production").await.unwrap().0 as u32;
56
57 let zed_id = db.create_root_channel("zed", "1", a_id).await.unwrap();
58
59 db.invite_channel_member(zed_id, b_id, a_id, false)
60 .await
61 .unwrap();
62
63 db.respond_to_channel_invite(zed_id, b_id, true)
64 .await
65 .unwrap();
66
67 let connection_id_a = ConnectionId { owner_id, id: 1 };
68 let _ = db
69 .join_channel_buffer(zed_id, a_id, connection_id_a)
70 .await
71 .unwrap();
72
73 let mut buffer_a = Buffer::new(0, 0, "".to_string());
74 let mut operations = Vec::new();
75 operations.push(buffer_a.edit([(0..0, "hello world")]));
76 operations.push(buffer_a.edit([(5..5, ", cruel")]));
77 operations.push(buffer_a.edit([(0..5, "goodbye")]));
78 operations.push(buffer_a.undo().unwrap().1);
79 assert_eq!(buffer_a.text(), "hello, cruel world");
80
81 let operations = operations
82 .into_iter()
83 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
84 .collect::<Vec<_>>();
85
86 db.update_channel_buffer(zed_id, a_id, &operations)
87 .await
88 .unwrap();
89
90 let connection_id_b = ConnectionId { owner_id, id: 2 };
91 let buffer_response_b = db
92 .join_channel_buffer(zed_id, b_id, connection_id_b)
93 .await
94 .unwrap();
95
96 let mut buffer_b = Buffer::new(0, 0, buffer_response_b.base_text);
97 buffer_b
98 .apply_ops(buffer_response_b.operations.into_iter().map(|operation| {
99 let operation = proto::deserialize_operation(operation).unwrap();
100 if let language::Operation::Buffer(operation) = operation {
101 operation
102 } else {
103 unreachable!()
104 }
105 }))
106 .unwrap();
107
108 assert_eq!(buffer_b.text(), "hello, cruel world");
109
110 // Ensure that C fails to open the buffer
111 assert!(db
112 .join_channel_buffer(zed_id, c_id, ConnectionId { owner_id, id: 3 })
113 .await
114 .is_err());
115
116 // Ensure that both collaborators have shown up
117 assert_eq!(
118 buffer_response_b.collaborators,
119 &[
120 rpc::proto::Collaborator {
121 user_id: a_id.to_proto(),
122 peer_id: Some(rpc::proto::PeerId { id: 1, owner_id }),
123 replica_id: 0,
124 },
125 rpc::proto::Collaborator {
126 user_id: b_id.to_proto(),
127 peer_id: Some(rpc::proto::PeerId { id: 2, owner_id }),
128 replica_id: 1,
129 }
130 ]
131 );
132
133 // Ensure that get_channel_buffer_collaborators works
134 let zed_collaborats = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
135 assert_eq!(zed_collaborats, &[a_id, b_id]);
136
137 let left_buffer = db
138 .leave_channel_buffer(zed_id, connection_id_b)
139 .await
140 .unwrap();
141
142 assert_eq!(left_buffer.connections, &[connection_id_a],);
143
144 let cargo_id = db.create_root_channel("cargo", "2", a_id).await.unwrap();
145 let _ = db
146 .join_channel_buffer(cargo_id, a_id, connection_id_a)
147 .await
148 .unwrap();
149
150 db.leave_channel_buffers(connection_id_a).await.unwrap();
151
152 let zed_collaborators = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
153 let cargo_collaborators = db.get_channel_buffer_collaborators(cargo_id).await.unwrap();
154 assert_eq!(zed_collaborators, &[]);
155 assert_eq!(cargo_collaborators, &[]);
156
157 // When everyone has left the channel, the operations are collapsed into
158 // a new base text.
159 let buffer_response_b = db
160 .join_channel_buffer(zed_id, b_id, connection_id_b)
161 .await
162 .unwrap();
163 assert_eq!(buffer_response_b.base_text, "hello, cruel world");
164 assert_eq!(buffer_response_b.operations, &[]);
165}
166
167test_both_dbs!(
168 test_channel_buffers_diffs,
169 test_channel_buffers_diffs_postgres,
170 test_channel_buffers_diffs_sqlite
171);
172
173async fn test_channel_buffers_diffs(db: &Database) {
174 panic!("Rewriting the way this works");
175
176 let a_id = db
177 .create_user(
178 "user_a@example.com",
179 false,
180 NewUserParams {
181 github_login: "user_a".into(),
182 github_user_id: 101,
183 invite_count: 0,
184 },
185 )
186 .await
187 .unwrap()
188 .user_id;
189 let b_id = db
190 .create_user(
191 "user_b@example.com",
192 false,
193 NewUserParams {
194 github_login: "user_b".into(),
195 github_user_id: 102,
196 invite_count: 0,
197 },
198 )
199 .await
200 .unwrap()
201 .user_id;
202
203 let owner_id = db.create_server("production").await.unwrap().0 as u32;
204
205 let zed_id = db.create_root_channel("zed", "1", a_id).await.unwrap();
206
207 db.invite_channel_member(zed_id, b_id, a_id, false)
208 .await
209 .unwrap();
210
211 db.respond_to_channel_invite(zed_id, b_id, true)
212 .await
213 .unwrap();
214
215 let connection_id_a = ConnectionId {
216 owner_id,
217 id: a_id.0 as u32,
218 };
219 let connection_id_b = ConnectionId {
220 owner_id,
221 id: b_id.0 as u32,
222 };
223
224 // Zero test: A should not register as changed on an unitialized channel buffer
225 assert!(!db.test_has_note_changed(a_id, zed_id).await.unwrap());
226
227 let _ = db
228 .join_channel_buffer(zed_id, a_id, connection_id_a)
229 .await
230 .unwrap();
231
232 // Zero test: A should register as changed on an empty channel buffer
233 assert!(!db.test_has_note_changed(a_id, zed_id).await.unwrap());
234
235 let mut buffer_a = Buffer::new(0, 0, "".to_string());
236 let mut operations = Vec::new();
237 operations.push(buffer_a.edit([(0..0, "hello world")]));
238 assert_eq!(buffer_a.text(), "hello world");
239
240 let operations = operations
241 .into_iter()
242 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
243 .collect::<Vec<_>>();
244
245 db.update_channel_buffer(zed_id, a_id, &operations)
246 .await
247 .unwrap();
248
249 // Smoke test: Does B register as changed, A as unchanged?
250 assert!(db.test_has_note_changed(b_id, zed_id).await.unwrap());
251
252 assert!(!db.test_has_note_changed(a_id, zed_id).await.unwrap());
253
254 db.leave_channel_buffer(zed_id, connection_id_a)
255 .await
256 .unwrap();
257
258 // Snapshotting from leaving the channel buffer should not have a diff
259 assert!(!db.test_has_note_changed(a_id, zed_id).await.unwrap());
260
261 let _ = db
262 .join_channel_buffer(zed_id, b_id, connection_id_b)
263 .await
264 .unwrap();
265
266 // B has opened the channel buffer, so we shouldn't have any diff
267 assert!(!db.test_has_note_changed(b_id, zed_id).await.unwrap());
268
269 db.leave_channel_buffer(zed_id, connection_id_b)
270 .await
271 .unwrap();
272
273 // Since B just opened and closed the buffer without editing, neither should have a diff
274 assert!(!db.test_has_note_changed(a_id, zed_id).await.unwrap());
275 assert!(!db.test_has_note_changed(b_id, zed_id).await.unwrap());
276}
277
278test_both_dbs!(
279 test_channel_buffers_last_operations,
280 test_channel_buffers_last_operations_postgres,
281 test_channel_buffers_last_operations_sqlite
282);
283
284async fn test_channel_buffers_last_operations(db: &Database) {
285 let user_id = db
286 .create_user(
287 "user_a@example.com",
288 false,
289 NewUserParams {
290 github_login: "user_a".into(),
291 github_user_id: 101,
292 invite_count: 0,
293 },
294 )
295 .await
296 .unwrap()
297 .user_id;
298 let owner_id = db.create_server("production").await.unwrap().0 as u32;
299 let connection_id = ConnectionId {
300 owner_id,
301 id: user_id.0 as u32,
302 };
303
304 let mut buffers = Vec::new();
305 let mut text_buffers = Vec::new();
306 for i in 0..3 {
307 let channel = db
308 .create_root_channel(&format!("channel-{i}"), &format!("room-{i}"), user_id)
309 .await
310 .unwrap();
311
312 db.join_channel_buffer(channel, user_id, connection_id)
313 .await
314 .unwrap();
315
316 buffers.push(
317 db.transaction(|tx| async move { db.get_channel_buffer(channel, &*tx).await })
318 .await
319 .unwrap(),
320 );
321
322 text_buffers.push(Buffer::new(0, 0, "".to_string()));
323 }
324
325 let operations = db
326 .transaction(|tx| {
327 let buffers = &buffers;
328 async move {
329 db.get_last_operations_for_buffers([buffers[0].id, buffers[2].id], &*tx)
330 .await
331 }
332 })
333 .await
334 .unwrap();
335
336 assert!(operations.is_empty());
337
338 update_buffer(
339 buffers[0].channel_id,
340 user_id,
341 db,
342 vec![
343 text_buffers[0].edit([(0..0, "a")]),
344 text_buffers[0].edit([(0..0, "b")]),
345 text_buffers[0].edit([(0..0, "c")]),
346 ],
347 )
348 .await;
349
350 update_buffer(
351 buffers[1].channel_id,
352 user_id,
353 db,
354 vec![
355 text_buffers[1].edit([(0..0, "d")]),
356 text_buffers[1].edit([(1..1, "e")]),
357 text_buffers[1].edit([(2..2, "f")]),
358 ],
359 )
360 .await;
361
362 // cause buffer 1's epoch to increment.
363 db.leave_channel_buffer(buffers[1].channel_id, connection_id)
364 .await
365 .unwrap();
366 db.join_channel_buffer(buffers[1].channel_id, user_id, connection_id)
367 .await
368 .unwrap();
369 text_buffers[1] = Buffer::new(1, 0, "def".to_string());
370 update_buffer(
371 buffers[1].channel_id,
372 user_id,
373 db,
374 vec![
375 text_buffers[1].edit([(0..0, "g")]),
376 text_buffers[1].edit([(0..0, "h")]),
377 ],
378 )
379 .await;
380
381 update_buffer(
382 buffers[2].channel_id,
383 user_id,
384 db,
385 vec![text_buffers[2].edit([(0..0, "i")])],
386 )
387 .await;
388
389 let operations = db
390 .transaction(|tx| {
391 let buffers = &buffers;
392 async move {
393 db.get_last_operations_for_buffers([buffers[1].id, buffers[2].id], &*tx)
394 .await
395 }
396 })
397 .await
398 .unwrap();
399 assert_operations(
400 &operations,
401 &[
402 (buffers[1].id, 1, &text_buffers[1]),
403 (buffers[2].id, 0, &text_buffers[2]),
404 ],
405 );
406
407 let operations = db
408 .transaction(|tx| {
409 let buffers = &buffers;
410 async move {
411 db.get_last_operations_for_buffers([buffers[0].id, buffers[1].id], &*tx)
412 .await
413 }
414 })
415 .await
416 .unwrap();
417 assert_operations(
418 &operations,
419 &[
420 (buffers[0].id, 0, &text_buffers[0]),
421 (buffers[1].id, 1, &text_buffers[1]),
422 ],
423 );
424
425 async fn update_buffer(
426 channel_id: ChannelId,
427 user_id: UserId,
428 db: &Database,
429 operations: Vec<text::Operation>,
430 ) {
431 let operations = operations
432 .into_iter()
433 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
434 .collect::<Vec<_>>();
435 db.update_channel_buffer(channel_id, user_id, &operations)
436 .await
437 .unwrap();
438 }
439
440 fn assert_operations(
441 operations: &[buffer_operation::Model],
442 expected: &[(BufferId, i32, &text::Buffer)],
443 ) {
444 let actual = operations
445 .iter()
446 .map(|op| buffer_operation::Model {
447 buffer_id: op.buffer_id,
448 epoch: op.epoch,
449 lamport_timestamp: op.lamport_timestamp,
450 replica_id: op.replica_id,
451 value: vec![],
452 })
453 .collect::<Vec<_>>();
454 let expected = expected
455 .iter()
456 .map(|(buffer_id, epoch, buffer)| buffer_operation::Model {
457 buffer_id: *buffer_id,
458 epoch: *epoch,
459 lamport_timestamp: buffer.lamport_clock.value as i32 - 1,
460 replica_id: buffer.replica_id() as i32,
461 value: vec![],
462 })
463 .collect::<Vec<_>>();
464 assert_eq!(actual, expected, "unexpected operations")
465 }
466}