1use super::*;
2use crate::test_both_dbs;
3use language::proto::{self, serialize_version};
4use text::Buffer;
5
6test_both_dbs!(
7 test_channel_buffers,
8 test_channel_buffers_postgres,
9 test_channel_buffers_sqlite
10);
11
12async fn test_channel_buffers(db: &Arc<Database>) {
13 let a_id = db
14 .create_user(
15 "user_a@example.com",
16 false,
17 NewUserParams {
18 github_login: "user_a".into(),
19 github_user_id: 101,
20 },
21 )
22 .await
23 .unwrap()
24 .user_id;
25 let b_id = db
26 .create_user(
27 "user_b@example.com",
28 false,
29 NewUserParams {
30 github_login: "user_b".into(),
31 github_user_id: 102,
32 },
33 )
34 .await
35 .unwrap()
36 .user_id;
37
38 // This user will not be a part of the channel
39 let c_id = db
40 .create_user(
41 "user_c@example.com",
42 false,
43 NewUserParams {
44 github_login: "user_c".into(),
45 github_user_id: 102,
46 },
47 )
48 .await
49 .unwrap()
50 .user_id;
51
52 let owner_id = db.create_server("production").await.unwrap().0 as u32;
53
54 let zed_id = db.create_root_channel("zed", a_id).await.unwrap();
55
56 db.invite_channel_member(zed_id, b_id, a_id, ChannelRole::Member)
57 .await
58 .unwrap();
59
60 db.respond_to_channel_invite(zed_id, b_id, true)
61 .await
62 .unwrap();
63
64 let connection_id_a = ConnectionId { owner_id, id: 1 };
65 let _ = db
66 .join_channel_buffer(zed_id, a_id, connection_id_a)
67 .await
68 .unwrap();
69
70 let mut buffer_a = Buffer::new(0, text::BufferId::new(1).unwrap(), "".to_string());
71 let mut operations = Vec::new();
72 operations.push(buffer_a.edit([(0..0, "hello world")]));
73 operations.push(buffer_a.edit([(5..5, ", cruel")]));
74 operations.push(buffer_a.edit([(0..5, "goodbye")]));
75 operations.push(buffer_a.undo().unwrap().1);
76 assert_eq!(buffer_a.text(), "hello, cruel world");
77
78 let operations = operations
79 .into_iter()
80 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
81 .collect::<Vec<_>>();
82
83 db.update_channel_buffer(zed_id, a_id, &operations)
84 .await
85 .unwrap();
86
87 let connection_id_b = ConnectionId { owner_id, id: 2 };
88 let buffer_response_b = db
89 .join_channel_buffer(zed_id, b_id, connection_id_b)
90 .await
91 .unwrap();
92
93 let mut buffer_b = Buffer::new(
94 0,
95 text::BufferId::new(1).unwrap(),
96 buffer_response_b.base_text,
97 );
98 buffer_b
99 .apply_ops(buffer_response_b.operations.into_iter().map(|operation| {
100 let operation = proto::deserialize_operation(operation).unwrap();
101 if let language::Operation::Buffer(operation) = operation {
102 operation
103 } else {
104 unreachable!()
105 }
106 }))
107 .unwrap();
108
109 assert_eq!(buffer_b.text(), "hello, cruel world");
110
111 // Ensure that C fails to open the buffer
112 assert!(db
113 .join_channel_buffer(zed_id, c_id, ConnectionId { owner_id, id: 3 })
114 .await
115 .is_err());
116
117 // Ensure that both collaborators have shown up
118 assert_eq!(
119 buffer_response_b.collaborators,
120 &[
121 rpc::proto::Collaborator {
122 user_id: a_id.to_proto(),
123 peer_id: Some(rpc::proto::PeerId { id: 1, owner_id }),
124 replica_id: 0,
125 },
126 rpc::proto::Collaborator {
127 user_id: b_id.to_proto(),
128 peer_id: Some(rpc::proto::PeerId { id: 2, owner_id }),
129 replica_id: 1,
130 }
131 ]
132 );
133
134 // Ensure that get_channel_buffer_collaborators works
135 let zed_collaborats = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
136 assert_eq!(zed_collaborats, &[a_id, b_id]);
137
138 let left_buffer = db
139 .leave_channel_buffer(zed_id, connection_id_b)
140 .await
141 .unwrap();
142
143 assert_eq!(left_buffer.connections, &[connection_id_a],);
144
145 let cargo_id = db.create_root_channel("cargo", a_id).await.unwrap();
146 let _ = db
147 .join_channel_buffer(cargo_id, a_id, connection_id_a)
148 .await
149 .unwrap();
150
151 db.leave_channel_buffers(connection_id_a).await.unwrap();
152
153 let zed_collaborators = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
154 let cargo_collaborators = db.get_channel_buffer_collaborators(cargo_id).await.unwrap();
155 assert_eq!(zed_collaborators, &[]);
156 assert_eq!(cargo_collaborators, &[]);
157
158 // When everyone has left the channel, the operations are collapsed into
159 // a new base text.
160 let buffer_response_b = db
161 .join_channel_buffer(zed_id, b_id, connection_id_b)
162 .await
163 .unwrap();
164 assert_eq!(buffer_response_b.base_text, "hello, cruel world");
165 assert_eq!(buffer_response_b.operations, &[]);
166}
167
168test_both_dbs!(
169 test_channel_buffers_last_operations,
170 test_channel_buffers_last_operations_postgres,
171 test_channel_buffers_last_operations_sqlite
172);
173
174async fn test_channel_buffers_last_operations(db: &Database) {
175 let user_id = db
176 .create_user(
177 "user_a@example.com",
178 false,
179 NewUserParams {
180 github_login: "user_a".into(),
181 github_user_id: 101,
182 },
183 )
184 .await
185 .unwrap()
186 .user_id;
187 let observer_id = db
188 .create_user(
189 "user_b@example.com",
190 false,
191 NewUserParams {
192 github_login: "user_b".into(),
193 github_user_id: 102,
194 },
195 )
196 .await
197 .unwrap()
198 .user_id;
199 let owner_id = db.create_server("production").await.unwrap().0 as u32;
200 let connection_id = ConnectionId {
201 owner_id,
202 id: user_id.0 as u32,
203 };
204
205 let mut buffers = Vec::new();
206 let mut text_buffers = Vec::new();
207 for i in 0..3 {
208 let channel = db
209 .create_root_channel(&format!("channel-{i}"), user_id)
210 .await
211 .unwrap();
212
213 db.invite_channel_member(channel, observer_id, user_id, ChannelRole::Member)
214 .await
215 .unwrap();
216 db.respond_to_channel_invite(channel, observer_id, true)
217 .await
218 .unwrap();
219
220 db.join_channel_buffer(channel, user_id, connection_id)
221 .await
222 .unwrap();
223
224 buffers.push(
225 db.transaction(|tx| async move { db.get_channel_buffer(channel, &*tx).await })
226 .await
227 .unwrap(),
228 );
229
230 text_buffers.push(Buffer::new(
231 0,
232 text::BufferId::new(1).unwrap(),
233 "".to_string(),
234 ));
235 }
236
237 let operations = db
238 .transaction(|tx| {
239 let buffers = &buffers;
240 async move {
241 db.get_latest_operations_for_buffers([buffers[0].id, buffers[2].id], &*tx)
242 .await
243 }
244 })
245 .await
246 .unwrap();
247
248 assert!(operations.is_empty());
249
250 update_buffer(
251 buffers[0].channel_id,
252 user_id,
253 db,
254 vec![
255 text_buffers[0].edit([(0..0, "a")]),
256 text_buffers[0].edit([(0..0, "b")]),
257 text_buffers[0].edit([(0..0, "c")]),
258 ],
259 )
260 .await;
261
262 update_buffer(
263 buffers[1].channel_id,
264 user_id,
265 db,
266 vec![
267 text_buffers[1].edit([(0..0, "d")]),
268 text_buffers[1].edit([(1..1, "e")]),
269 text_buffers[1].edit([(2..2, "f")]),
270 ],
271 )
272 .await;
273
274 // cause buffer 1's epoch to increment.
275 db.leave_channel_buffer(buffers[1].channel_id, connection_id)
276 .await
277 .unwrap();
278 db.join_channel_buffer(buffers[1].channel_id, user_id, connection_id)
279 .await
280 .unwrap();
281 text_buffers[1] = Buffer::new(1, text::BufferId::new(1).unwrap(), "def".to_string());
282 update_buffer(
283 buffers[1].channel_id,
284 user_id,
285 db,
286 vec![
287 text_buffers[1].edit([(0..0, "g")]),
288 text_buffers[1].edit([(0..0, "h")]),
289 ],
290 )
291 .await;
292
293 update_buffer(
294 buffers[2].channel_id,
295 user_id,
296 db,
297 vec![text_buffers[2].edit([(0..0, "i")])],
298 )
299 .await;
300
301 let operations = db
302 .transaction(|tx| {
303 let buffers = &buffers;
304 async move {
305 db.get_latest_operations_for_buffers([buffers[1].id, buffers[2].id], &*tx)
306 .await
307 }
308 })
309 .await
310 .unwrap();
311 assert_operations(
312 &operations,
313 &[
314 (buffers[1].id, 1, &text_buffers[1]),
315 (buffers[2].id, 0, &text_buffers[2]),
316 ],
317 );
318
319 let operations = db
320 .transaction(|tx| {
321 let buffers = &buffers;
322 async move {
323 db.get_latest_operations_for_buffers([buffers[0].id, buffers[1].id], &*tx)
324 .await
325 }
326 })
327 .await
328 .unwrap();
329 assert_operations(
330 &operations,
331 &[
332 (buffers[0].id, 0, &text_buffers[0]),
333 (buffers[1].id, 1, &text_buffers[1]),
334 ],
335 );
336
337 let buffer_changes = db
338 .transaction(|tx| {
339 let buffers = &buffers;
340 async move {
341 db.latest_channel_buffer_changes(
342 &[
343 buffers[0].channel_id,
344 buffers[1].channel_id,
345 buffers[2].channel_id,
346 ],
347 &*tx,
348 )
349 .await
350 }
351 })
352 .await
353 .unwrap();
354
355 pretty_assertions::assert_eq!(
356 buffer_changes,
357 [
358 rpc::proto::ChannelBufferVersion {
359 channel_id: buffers[0].channel_id.to_proto(),
360 epoch: 0,
361 version: serialize_version(&text_buffers[0].version()),
362 },
363 rpc::proto::ChannelBufferVersion {
364 channel_id: buffers[1].channel_id.to_proto(),
365 epoch: 1,
366 version: serialize_version(&text_buffers[1].version())
367 .into_iter()
368 .filter(|vector| vector.replica_id
369 == buffer_changes[1].version.first().unwrap().replica_id)
370 .collect::<Vec<_>>(),
371 },
372 rpc::proto::ChannelBufferVersion {
373 channel_id: buffers[2].channel_id.to_proto(),
374 epoch: 0,
375 version: serialize_version(&text_buffers[2].version()),
376 },
377 ]
378 );
379}
380
381async fn update_buffer(
382 channel_id: ChannelId,
383 user_id: UserId,
384 db: &Database,
385 operations: Vec<text::Operation>,
386) {
387 let operations = operations
388 .into_iter()
389 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
390 .collect::<Vec<_>>();
391 db.update_channel_buffer(channel_id, user_id, &operations)
392 .await
393 .unwrap();
394}
395
396fn assert_operations(
397 operations: &[buffer_operation::Model],
398 expected: &[(BufferId, i32, &text::Buffer)],
399) {
400 let actual = operations
401 .iter()
402 .map(|op| buffer_operation::Model {
403 buffer_id: op.buffer_id,
404 epoch: op.epoch,
405 lamport_timestamp: op.lamport_timestamp,
406 replica_id: op.replica_id,
407 value: vec![],
408 })
409 .collect::<Vec<_>>();
410 let expected = expected
411 .iter()
412 .map(|(buffer_id, epoch, buffer)| buffer_operation::Model {
413 buffer_id: *buffer_id,
414 epoch: *epoch,
415 lamport_timestamp: buffer.lamport_clock.value as i32 - 1,
416 replica_id: buffer.replica_id() as i32,
417 value: vec![],
418 })
419 .collect::<Vec<_>>();
420 assert_eq!(actual, expected, "unexpected operations")
421}