1use super::*;
2use crate::test_both_dbs;
3use language::proto::{self, serialize_version};
4use text::{Buffer, ReplicaId};
5
6test_both_dbs!(
7 test_channel_buffers,
8 test_channel_buffers_postgres,
9 test_channel_buffers_sqlite
10);
11
12async fn test_channel_buffers(db: &Arc<Database>) {
13 let a_id = db
14 .create_user(
15 "user_a@example.com",
16 None,
17 false,
18 NewUserParams {
19 github_login: "user_a".into(),
20 github_user_id: 101,
21 },
22 )
23 .await
24 .unwrap()
25 .user_id;
26 let b_id = db
27 .create_user(
28 "user_b@example.com",
29 None,
30 false,
31 NewUserParams {
32 github_login: "user_b".into(),
33 github_user_id: 102,
34 },
35 )
36 .await
37 .unwrap()
38 .user_id;
39
40 // This user will not be a part of the channel
41 let c_id = db
42 .create_user(
43 "user_c@example.com",
44 None,
45 false,
46 NewUserParams {
47 github_login: "user_c".into(),
48 github_user_id: 103,
49 },
50 )
51 .await
52 .unwrap()
53 .user_id;
54
55 let owner_id = db.create_server("production").await.unwrap().0 as u32;
56
57 let zed_id = db.create_root_channel("zed", a_id).await.unwrap();
58
59 db.invite_channel_member(zed_id, b_id, a_id, ChannelRole::Member)
60 .await
61 .unwrap();
62
63 db.respond_to_channel_invite(zed_id, b_id, true)
64 .await
65 .unwrap();
66
67 let connection_id_a = ConnectionId { owner_id, id: 1 };
68 let _ = db
69 .join_channel_buffer(zed_id, a_id, connection_id_a)
70 .await
71 .unwrap();
72
73 let mut buffer_a = Buffer::new(
74 ReplicaId::new(0),
75 text::BufferId::new(1).unwrap(),
76 "".to_string(),
77 );
78 let operations = vec![
79 buffer_a.edit([(0..0, "hello world")]),
80 buffer_a.edit([(5..5, ", cruel")]),
81 buffer_a.edit([(0..5, "goodbye")]),
82 buffer_a.undo().unwrap().1,
83 ];
84 assert_eq!(buffer_a.text(), "hello, cruel world");
85
86 let operations = operations
87 .into_iter()
88 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
89 .collect::<Vec<_>>();
90
91 db.update_channel_buffer(zed_id, a_id, &operations)
92 .await
93 .unwrap();
94
95 let connection_id_b = ConnectionId { owner_id, id: 2 };
96 let buffer_response_b = db
97 .join_channel_buffer(zed_id, b_id, connection_id_b)
98 .await
99 .unwrap();
100
101 let mut buffer_b = Buffer::new(
102 ReplicaId::new(0),
103 text::BufferId::new(1).unwrap(),
104 buffer_response_b.base_text,
105 );
106 buffer_b.apply_ops(buffer_response_b.operations.into_iter().map(|operation| {
107 let operation = proto::deserialize_operation(operation).unwrap();
108 if let language::Operation::Buffer(operation) = operation {
109 operation
110 } else {
111 unreachable!()
112 }
113 }));
114
115 assert_eq!(buffer_b.text(), "hello, cruel world");
116
117 // Ensure that C fails to open the buffer
118 assert!(
119 db.join_channel_buffer(zed_id, c_id, ConnectionId { owner_id, id: 3 })
120 .await
121 .is_err()
122 );
123
124 // Ensure that both collaborators have shown up
125 assert_eq!(
126 buffer_response_b.collaborators,
127 &[
128 rpc::proto::Collaborator {
129 user_id: a_id.to_proto(),
130 peer_id: Some(rpc::proto::PeerId { id: 1, owner_id }),
131 replica_id: ReplicaId::FIRST_COLLAB_ID.as_u16() as u32,
132 is_host: false,
133 committer_name: None,
134 committer_email: None,
135 },
136 rpc::proto::Collaborator {
137 user_id: b_id.to_proto(),
138 peer_id: Some(rpc::proto::PeerId { id: 2, owner_id }),
139 replica_id: ReplicaId::FIRST_COLLAB_ID.as_u16() as u32 + 1,
140 is_host: false,
141 committer_name: None,
142 committer_email: None,
143 }
144 ]
145 );
146
147 // Ensure that get_channel_buffer_collaborators works
148 let zed_collaborats = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
149 assert_eq!(zed_collaborats, &[a_id, b_id]);
150
151 let left_buffer = db
152 .leave_channel_buffer(zed_id, connection_id_b)
153 .await
154 .unwrap();
155
156 assert_eq!(left_buffer.connections, &[connection_id_a],);
157
158 let cargo_id = db.create_root_channel("cargo", a_id).await.unwrap();
159 let _ = db
160 .join_channel_buffer(cargo_id, a_id, connection_id_a)
161 .await
162 .unwrap();
163
164 db.leave_channel_buffers(connection_id_a).await.unwrap();
165
166 let zed_collaborators = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
167 let cargo_collaborators = db.get_channel_buffer_collaborators(cargo_id).await.unwrap();
168 assert_eq!(zed_collaborators, &[]);
169 assert_eq!(cargo_collaborators, &[]);
170
171 // When everyone has left the channel, the operations are collapsed into
172 // a new base text.
173 let buffer_response_b = db
174 .join_channel_buffer(zed_id, b_id, connection_id_b)
175 .await
176 .unwrap();
177 assert_eq!(buffer_response_b.base_text, "hello, cruel world");
178 assert_eq!(buffer_response_b.operations, &[]);
179}
180
181test_both_dbs!(
182 test_channel_buffers_last_operations,
183 test_channel_buffers_last_operations_postgres,
184 test_channel_buffers_last_operations_sqlite
185);
186
187async fn test_channel_buffers_last_operations(db: &Database) {
188 let user_id = db
189 .create_user(
190 "user_a@example.com",
191 None,
192 false,
193 NewUserParams {
194 github_login: "user_a".into(),
195 github_user_id: 101,
196 },
197 )
198 .await
199 .unwrap()
200 .user_id;
201 let observer_id = db
202 .create_user(
203 "user_b@example.com",
204 None,
205 false,
206 NewUserParams {
207 github_login: "user_b".into(),
208 github_user_id: 102,
209 },
210 )
211 .await
212 .unwrap()
213 .user_id;
214 let owner_id = db.create_server("production").await.unwrap().0 as u32;
215 let connection_id = ConnectionId {
216 owner_id,
217 id: user_id.0 as u32,
218 };
219
220 let mut buffers = Vec::new();
221 let mut text_buffers = Vec::new();
222 for i in 0..3 {
223 let channel = db
224 .create_root_channel(&format!("channel-{i}"), user_id)
225 .await
226 .unwrap();
227
228 db.invite_channel_member(channel, observer_id, user_id, ChannelRole::Member)
229 .await
230 .unwrap();
231 db.respond_to_channel_invite(channel, observer_id, true)
232 .await
233 .unwrap();
234
235 let res = db
236 .join_channel_buffer(channel, user_id, connection_id)
237 .await
238 .unwrap();
239
240 buffers.push(
241 db.transaction(|tx| async move { db.get_channel_buffer(channel, &tx).await })
242 .await
243 .unwrap(),
244 );
245
246 text_buffers.push(Buffer::new(
247 ReplicaId::new(res.replica_id as u16),
248 text::BufferId::new(1).unwrap(),
249 "".to_string(),
250 ));
251 }
252
253 update_buffer(
254 buffers[0].channel_id,
255 user_id,
256 db,
257 vec![
258 text_buffers[0].edit([(0..0, "a")]),
259 text_buffers[0].edit([(0..0, "b")]),
260 text_buffers[0].edit([(0..0, "c")]),
261 ],
262 )
263 .await;
264
265 update_buffer(
266 buffers[1].channel_id,
267 user_id,
268 db,
269 vec![
270 text_buffers[1].edit([(0..0, "d")]),
271 text_buffers[1].edit([(1..1, "e")]),
272 text_buffers[1].edit([(2..2, "f")]),
273 ],
274 )
275 .await;
276
277 // cause buffer 1's epoch to increment.
278 db.leave_channel_buffer(buffers[1].channel_id, connection_id)
279 .await
280 .unwrap();
281 db.join_channel_buffer(buffers[1].channel_id, user_id, connection_id)
282 .await
283 .unwrap();
284 let replica_id = text_buffers[1].replica_id();
285 text_buffers[1] = Buffer::new(
286 replica_id,
287 text::BufferId::new(1).unwrap(),
288 "def".to_string(),
289 );
290 update_buffer(
291 buffers[1].channel_id,
292 user_id,
293 db,
294 vec![
295 text_buffers[1].edit([(0..0, "g")]),
296 text_buffers[1].edit([(0..0, "h")]),
297 ],
298 )
299 .await;
300
301 update_buffer(
302 buffers[2].channel_id,
303 user_id,
304 db,
305 vec![text_buffers[2].edit([(0..0, "i")])],
306 )
307 .await;
308
309 let channels_for_user = db.get_channels_for_user(user_id).await.unwrap();
310
311 pretty_assertions::assert_eq!(
312 channels_for_user.latest_buffer_versions,
313 [
314 rpc::proto::ChannelBufferVersion {
315 channel_id: buffers[0].channel_id.to_proto(),
316 epoch: 0,
317 version: serialize_version(&text_buffers[0].version())
318 .into_iter()
319 .filter(
320 |vector| vector.replica_id == text_buffers[0].replica_id().as_u16() as u32
321 )
322 .collect::<Vec<_>>(),
323 },
324 rpc::proto::ChannelBufferVersion {
325 channel_id: buffers[1].channel_id.to_proto(),
326 epoch: 1,
327 version: serialize_version(&text_buffers[1].version())
328 .into_iter()
329 .filter(
330 |vector| vector.replica_id == text_buffers[1].replica_id().as_u16() as u32
331 )
332 .collect::<Vec<_>>(),
333 },
334 rpc::proto::ChannelBufferVersion {
335 channel_id: buffers[2].channel_id.to_proto(),
336 epoch: 0,
337 version: serialize_version(&text_buffers[2].version())
338 .into_iter()
339 .filter(
340 |vector| vector.replica_id == text_buffers[2].replica_id().as_u16() as u32
341 )
342 .collect::<Vec<_>>(),
343 },
344 ]
345 );
346}
347
348async fn update_buffer(
349 channel_id: ChannelId,
350 user_id: UserId,
351 db: &Database,
352 operations: Vec<text::Operation>,
353) {
354 let operations = operations
355 .into_iter()
356 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
357 .collect::<Vec<_>>();
358 db.update_channel_buffer(channel_id, user_id, &operations)
359 .await
360 .unwrap();
361}