1use super::*;
2use crate::test_both_dbs;
3use language::proto::{self, serialize_version};
4use text::Buffer;
5
6test_both_dbs!(
7 test_channel_buffers,
8 test_channel_buffers_postgres,
9 test_channel_buffers_sqlite
10);
11
12async fn test_channel_buffers(db: &Arc<Database>) {
13 let a_id = db
14 .create_user(
15 "user_a@example.com",
16 false,
17 NewUserParams {
18 github_login: "user_a".into(),
19 github_user_id: 101,
20 },
21 )
22 .await
23 .unwrap()
24 .user_id;
25 let b_id = db
26 .create_user(
27 "user_b@example.com",
28 false,
29 NewUserParams {
30 github_login: "user_b".into(),
31 github_user_id: 102,
32 },
33 )
34 .await
35 .unwrap()
36 .user_id;
37
38 // This user will not be a part of the channel
39 let c_id = db
40 .create_user(
41 "user_c@example.com",
42 false,
43 NewUserParams {
44 github_login: "user_c".into(),
45 github_user_id: 102,
46 },
47 )
48 .await
49 .unwrap()
50 .user_id;
51
52 let owner_id = db.create_server("production").await.unwrap().0 as u32;
53
54 let zed_id = db.create_root_channel("zed", a_id).await.unwrap();
55
56 db.invite_channel_member(zed_id, b_id, a_id, ChannelRole::Member)
57 .await
58 .unwrap();
59
60 db.respond_to_channel_invite(zed_id, b_id, true)
61 .await
62 .unwrap();
63
64 let connection_id_a = ConnectionId { owner_id, id: 1 };
65 let _ = db
66 .join_channel_buffer(zed_id, a_id, connection_id_a)
67 .await
68 .unwrap();
69
70 let mut buffer_a = Buffer::new(0, 0, "".to_string());
71 let mut operations = Vec::new();
72 operations.push(buffer_a.edit([(0..0, "hello world")]));
73 operations.push(buffer_a.edit([(5..5, ", cruel")]));
74 operations.push(buffer_a.edit([(0..5, "goodbye")]));
75 operations.push(buffer_a.undo().unwrap().1);
76 assert_eq!(buffer_a.text(), "hello, cruel world");
77
78 let operations = operations
79 .into_iter()
80 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
81 .collect::<Vec<_>>();
82
83 db.update_channel_buffer(zed_id, a_id, &operations)
84 .await
85 .unwrap();
86
87 let connection_id_b = ConnectionId { owner_id, id: 2 };
88 let buffer_response_b = db
89 .join_channel_buffer(zed_id, b_id, connection_id_b)
90 .await
91 .unwrap();
92
93 let mut buffer_b = Buffer::new(0, 0, buffer_response_b.base_text);
94 buffer_b
95 .apply_ops(buffer_response_b.operations.into_iter().map(|operation| {
96 let operation = proto::deserialize_operation(operation).unwrap();
97 if let language::Operation::Buffer(operation) = operation {
98 operation
99 } else {
100 unreachable!()
101 }
102 }))
103 .unwrap();
104
105 assert_eq!(buffer_b.text(), "hello, cruel world");
106
107 // Ensure that C fails to open the buffer
108 assert!(db
109 .join_channel_buffer(zed_id, c_id, ConnectionId { owner_id, id: 3 })
110 .await
111 .is_err());
112
113 // Ensure that both collaborators have shown up
114 assert_eq!(
115 buffer_response_b.collaborators,
116 &[
117 rpc::proto::Collaborator {
118 user_id: a_id.to_proto(),
119 peer_id: Some(rpc::proto::PeerId { id: 1, owner_id }),
120 replica_id: 0,
121 },
122 rpc::proto::Collaborator {
123 user_id: b_id.to_proto(),
124 peer_id: Some(rpc::proto::PeerId { id: 2, owner_id }),
125 replica_id: 1,
126 }
127 ]
128 );
129
130 // Ensure that get_channel_buffer_collaborators works
131 let zed_collaborats = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
132 assert_eq!(zed_collaborats, &[a_id, b_id]);
133
134 let left_buffer = db
135 .leave_channel_buffer(zed_id, connection_id_b)
136 .await
137 .unwrap();
138
139 assert_eq!(left_buffer.connections, &[connection_id_a],);
140
141 let cargo_id = db.create_root_channel("cargo", a_id).await.unwrap();
142 let _ = db
143 .join_channel_buffer(cargo_id, a_id, connection_id_a)
144 .await
145 .unwrap();
146
147 db.leave_channel_buffers(connection_id_a).await.unwrap();
148
149 let zed_collaborators = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
150 let cargo_collaborators = db.get_channel_buffer_collaborators(cargo_id).await.unwrap();
151 assert_eq!(zed_collaborators, &[]);
152 assert_eq!(cargo_collaborators, &[]);
153
154 // When everyone has left the channel, the operations are collapsed into
155 // a new base text.
156 let buffer_response_b = db
157 .join_channel_buffer(zed_id, b_id, connection_id_b)
158 .await
159 .unwrap();
160 assert_eq!(buffer_response_b.base_text, "hello, cruel world");
161 assert_eq!(buffer_response_b.operations, &[]);
162}
163
164test_both_dbs!(
165 test_channel_buffers_last_operations,
166 test_channel_buffers_last_operations_postgres,
167 test_channel_buffers_last_operations_sqlite
168);
169
170async fn test_channel_buffers_last_operations(db: &Database) {
171 let user_id = db
172 .create_user(
173 "user_a@example.com",
174 false,
175 NewUserParams {
176 github_login: "user_a".into(),
177 github_user_id: 101,
178 },
179 )
180 .await
181 .unwrap()
182 .user_id;
183 let observer_id = db
184 .create_user(
185 "user_b@example.com",
186 false,
187 NewUserParams {
188 github_login: "user_b".into(),
189 github_user_id: 102,
190 },
191 )
192 .await
193 .unwrap()
194 .user_id;
195 let owner_id = db.create_server("production").await.unwrap().0 as u32;
196 let connection_id = ConnectionId {
197 owner_id,
198 id: user_id.0 as u32,
199 };
200
201 let mut buffers = Vec::new();
202 let mut text_buffers = Vec::new();
203 for i in 0..3 {
204 let channel = db
205 .create_root_channel(&format!("channel-{i}"), user_id)
206 .await
207 .unwrap();
208
209 db.invite_channel_member(channel, observer_id, user_id, ChannelRole::Member)
210 .await
211 .unwrap();
212 db.respond_to_channel_invite(channel, observer_id, true)
213 .await
214 .unwrap();
215
216 db.join_channel_buffer(channel, user_id, connection_id)
217 .await
218 .unwrap();
219
220 buffers.push(
221 db.transaction(|tx| async move { db.get_channel_buffer(channel, &*tx).await })
222 .await
223 .unwrap(),
224 );
225
226 text_buffers.push(Buffer::new(0, 0, "".to_string()));
227 }
228
229 let operations = db
230 .transaction(|tx| {
231 let buffers = &buffers;
232 async move {
233 db.get_latest_operations_for_buffers([buffers[0].id, buffers[2].id], &*tx)
234 .await
235 }
236 })
237 .await
238 .unwrap();
239
240 assert!(operations.is_empty());
241
242 update_buffer(
243 buffers[0].channel_id,
244 user_id,
245 db,
246 vec![
247 text_buffers[0].edit([(0..0, "a")]),
248 text_buffers[0].edit([(0..0, "b")]),
249 text_buffers[0].edit([(0..0, "c")]),
250 ],
251 )
252 .await;
253
254 update_buffer(
255 buffers[1].channel_id,
256 user_id,
257 db,
258 vec![
259 text_buffers[1].edit([(0..0, "d")]),
260 text_buffers[1].edit([(1..1, "e")]),
261 text_buffers[1].edit([(2..2, "f")]),
262 ],
263 )
264 .await;
265
266 // cause buffer 1's epoch to increment.
267 db.leave_channel_buffer(buffers[1].channel_id, connection_id)
268 .await
269 .unwrap();
270 db.join_channel_buffer(buffers[1].channel_id, user_id, connection_id)
271 .await
272 .unwrap();
273 text_buffers[1] = Buffer::new(1, 0, "def".to_string());
274 update_buffer(
275 buffers[1].channel_id,
276 user_id,
277 db,
278 vec![
279 text_buffers[1].edit([(0..0, "g")]),
280 text_buffers[1].edit([(0..0, "h")]),
281 ],
282 )
283 .await;
284
285 update_buffer(
286 buffers[2].channel_id,
287 user_id,
288 db,
289 vec![text_buffers[2].edit([(0..0, "i")])],
290 )
291 .await;
292
293 let operations = db
294 .transaction(|tx| {
295 let buffers = &buffers;
296 async move {
297 db.get_latest_operations_for_buffers([buffers[1].id, buffers[2].id], &*tx)
298 .await
299 }
300 })
301 .await
302 .unwrap();
303 assert_operations(
304 &operations,
305 &[
306 (buffers[1].id, 1, &text_buffers[1]),
307 (buffers[2].id, 0, &text_buffers[2]),
308 ],
309 );
310
311 let operations = db
312 .transaction(|tx| {
313 let buffers = &buffers;
314 async move {
315 db.get_latest_operations_for_buffers([buffers[0].id, buffers[1].id], &*tx)
316 .await
317 }
318 })
319 .await
320 .unwrap();
321 assert_operations(
322 &operations,
323 &[
324 (buffers[0].id, 0, &text_buffers[0]),
325 (buffers[1].id, 1, &text_buffers[1]),
326 ],
327 );
328
329 let buffer_changes = db
330 .transaction(|tx| {
331 let buffers = &buffers;
332 async move {
333 db.latest_channel_buffer_changes(
334 &[
335 buffers[0].channel_id,
336 buffers[1].channel_id,
337 buffers[2].channel_id,
338 ],
339 &*tx,
340 )
341 .await
342 }
343 })
344 .await
345 .unwrap();
346
347 pretty_assertions::assert_eq!(
348 buffer_changes,
349 [
350 rpc::proto::ChannelBufferVersion {
351 channel_id: buffers[0].channel_id.to_proto(),
352 epoch: 0,
353 version: serialize_version(&text_buffers[0].version()),
354 },
355 rpc::proto::ChannelBufferVersion {
356 channel_id: buffers[1].channel_id.to_proto(),
357 epoch: 1,
358 version: serialize_version(&text_buffers[1].version())
359 .into_iter()
360 .filter(|vector| vector.replica_id
361 == buffer_changes[1].version.first().unwrap().replica_id)
362 .collect::<Vec<_>>(),
363 },
364 rpc::proto::ChannelBufferVersion {
365 channel_id: buffers[2].channel_id.to_proto(),
366 epoch: 0,
367 version: serialize_version(&text_buffers[2].version()),
368 },
369 ]
370 );
371}
372
373async fn update_buffer(
374 channel_id: ChannelId,
375 user_id: UserId,
376 db: &Database,
377 operations: Vec<text::Operation>,
378) {
379 let operations = operations
380 .into_iter()
381 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
382 .collect::<Vec<_>>();
383 db.update_channel_buffer(channel_id, user_id, &operations)
384 .await
385 .unwrap();
386}
387
388fn assert_operations(
389 operations: &[buffer_operation::Model],
390 expected: &[(BufferId, i32, &text::Buffer)],
391) {
392 let actual = operations
393 .iter()
394 .map(|op| buffer_operation::Model {
395 buffer_id: op.buffer_id,
396 epoch: op.epoch,
397 lamport_timestamp: op.lamport_timestamp,
398 replica_id: op.replica_id,
399 value: vec![],
400 })
401 .collect::<Vec<_>>();
402 let expected = expected
403 .iter()
404 .map(|(buffer_id, epoch, buffer)| buffer_operation::Model {
405 buffer_id: *buffer_id,
406 epoch: *epoch,
407 lamport_timestamp: buffer.lamport_clock.value as i32 - 1,
408 replica_id: buffer.replica_id() as i32,
409 value: vec![],
410 })
411 .collect::<Vec<_>>();
412 assert_eq!(actual, expected, "unexpected operations")
413}