1use super::*;
2use crate::test_both_dbs;
3use language::proto::{self, serialize_version};
4use rpc::ConnectionId;
5use text::{Buffer, ReplicaId};
6
7test_both_dbs!(
8 test_channel_buffers,
9 test_channel_buffers_postgres,
10 test_channel_buffers_sqlite
11);
12
13async fn test_channel_buffers(db: &Arc<Database>) {
14 let a_id = db
15 .create_user(
16 "user_a@example.com",
17 None,
18 false,
19 NewUserParams {
20 github_login: "user_a".into(),
21 github_user_id: 101,
22 },
23 )
24 .await
25 .unwrap()
26 .user_id;
27 let b_id = db
28 .create_user(
29 "user_b@example.com",
30 None,
31 false,
32 NewUserParams {
33 github_login: "user_b".into(),
34 github_user_id: 102,
35 },
36 )
37 .await
38 .unwrap()
39 .user_id;
40
41 // This user will not be a part of the channel
42 let c_id = db
43 .create_user(
44 "user_c@example.com",
45 None,
46 false,
47 NewUserParams {
48 github_login: "user_c".into(),
49 github_user_id: 103,
50 },
51 )
52 .await
53 .unwrap()
54 .user_id;
55
56 let owner_id = db.create_server("production").await.unwrap().0 as u32;
57
58 let zed_id = db.create_root_channel("zed", a_id).await.unwrap();
59
60 db.invite_channel_member(zed_id, b_id, a_id, ChannelRole::Member)
61 .await
62 .unwrap();
63
64 db.respond_to_channel_invite(zed_id, b_id, true)
65 .await
66 .unwrap();
67
68 let connection_id_a = ConnectionId { owner_id, id: 1 };
69 let _ = db
70 .join_channel_buffer(zed_id, a_id, connection_id_a)
71 .await
72 .unwrap();
73
74 let mut buffer_a = Buffer::new(
75 ReplicaId::new(0),
76 text::BufferId::new(1).unwrap(),
77 "".to_string(),
78 );
79 let operations = vec![
80 buffer_a.edit([(0..0, "hello world")]),
81 buffer_a.edit([(5..5, ", cruel")]),
82 buffer_a.edit([(0..5, "goodbye")]),
83 buffer_a.undo().unwrap().1,
84 ];
85 assert_eq!(buffer_a.text(), "hello, cruel world");
86
87 let operations = operations
88 .into_iter()
89 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
90 .collect::<Vec<_>>();
91
92 db.update_channel_buffer(zed_id, a_id, &operations)
93 .await
94 .unwrap();
95
96 let connection_id_b = ConnectionId { owner_id, id: 2 };
97 let buffer_response_b = db
98 .join_channel_buffer(zed_id, b_id, connection_id_b)
99 .await
100 .unwrap();
101
102 let mut buffer_b = Buffer::new(
103 ReplicaId::new(0),
104 text::BufferId::new(1).unwrap(),
105 buffer_response_b.base_text,
106 );
107 buffer_b.apply_ops(buffer_response_b.operations.into_iter().map(|operation| {
108 let operation = proto::deserialize_operation(operation).unwrap();
109 if let language::Operation::Buffer(operation) = operation {
110 operation
111 } else {
112 unreachable!()
113 }
114 }));
115
116 assert_eq!(buffer_b.text(), "hello, cruel world");
117
118 // Ensure that C fails to open the buffer
119 assert!(
120 db.join_channel_buffer(zed_id, c_id, ConnectionId { owner_id, id: 3 })
121 .await
122 .is_err()
123 );
124
125 // Ensure that both collaborators have shown up
126 assert_eq!(
127 buffer_response_b.collaborators,
128 &[
129 rpc::proto::Collaborator {
130 user_id: a_id.to_proto(),
131 peer_id: Some(rpc::proto::PeerId { id: 1, owner_id }),
132 replica_id: ReplicaId::FIRST_COLLAB_ID.as_u16() as u32,
133 is_host: false,
134 committer_name: None,
135 committer_email: None,
136 },
137 rpc::proto::Collaborator {
138 user_id: b_id.to_proto(),
139 peer_id: Some(rpc::proto::PeerId { id: 2, owner_id }),
140 replica_id: ReplicaId::FIRST_COLLAB_ID.as_u16() as u32 + 1,
141 is_host: false,
142 committer_name: None,
143 committer_email: None,
144 }
145 ]
146 );
147
148 // Ensure that get_channel_buffer_collaborators works
149 let zed_collaborats = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
150 assert_eq!(zed_collaborats, &[a_id, b_id]);
151
152 let left_buffer = db
153 .leave_channel_buffer(zed_id, connection_id_b)
154 .await
155 .unwrap();
156
157 assert_eq!(left_buffer.connections, &[connection_id_a],);
158
159 let cargo_id = db.create_root_channel("cargo", a_id).await.unwrap();
160 let _ = db
161 .join_channel_buffer(cargo_id, a_id, connection_id_a)
162 .await
163 .unwrap();
164
165 db.leave_channel_buffers(connection_id_a).await.unwrap();
166
167 let zed_collaborators = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
168 let cargo_collaborators = db.get_channel_buffer_collaborators(cargo_id).await.unwrap();
169 assert_eq!(zed_collaborators, &[]);
170 assert_eq!(cargo_collaborators, &[]);
171
172 // When everyone has left the channel, the operations are collapsed into
173 // a new base text.
174 let buffer_response_b = db
175 .join_channel_buffer(zed_id, b_id, connection_id_b)
176 .await
177 .unwrap();
178 assert_eq!(buffer_response_b.base_text, "hello, cruel world");
179 assert_eq!(buffer_response_b.operations, &[]);
180}
181
182test_both_dbs!(
183 test_channel_buffers_last_operations,
184 test_channel_buffers_last_operations_postgres,
185 test_channel_buffers_last_operations_sqlite
186);
187
188async fn test_channel_buffers_last_operations(db: &Database) {
189 let user_id = db
190 .create_user(
191 "user_a@example.com",
192 None,
193 false,
194 NewUserParams {
195 github_login: "user_a".into(),
196 github_user_id: 101,
197 },
198 )
199 .await
200 .unwrap()
201 .user_id;
202 let observer_id = db
203 .create_user(
204 "user_b@example.com",
205 None,
206 false,
207 NewUserParams {
208 github_login: "user_b".into(),
209 github_user_id: 102,
210 },
211 )
212 .await
213 .unwrap()
214 .user_id;
215 let owner_id = db.create_server("production").await.unwrap().0 as u32;
216 let connection_id = ConnectionId {
217 owner_id,
218 id: user_id.0 as u32,
219 };
220
221 let mut buffers = Vec::new();
222 let mut text_buffers = Vec::new();
223 for i in 0..3 {
224 let channel = db
225 .create_root_channel(&format!("channel-{i}"), user_id)
226 .await
227 .unwrap();
228
229 db.invite_channel_member(channel, observer_id, user_id, ChannelRole::Member)
230 .await
231 .unwrap();
232 db.respond_to_channel_invite(channel, observer_id, true)
233 .await
234 .unwrap();
235
236 let res = db
237 .join_channel_buffer(channel, user_id, connection_id)
238 .await
239 .unwrap();
240
241 buffers.push(
242 db.transaction(|tx| async move { db.get_channel_buffer(channel, &tx).await })
243 .await
244 .unwrap(),
245 );
246
247 text_buffers.push(Buffer::new(
248 ReplicaId::new(res.replica_id as u16),
249 text::BufferId::new(1).unwrap(),
250 "".to_string(),
251 ));
252 }
253
254 update_buffer(
255 buffers[0].channel_id,
256 user_id,
257 db,
258 vec![
259 text_buffers[0].edit([(0..0, "a")]),
260 text_buffers[0].edit([(0..0, "b")]),
261 text_buffers[0].edit([(0..0, "c")]),
262 ],
263 )
264 .await;
265
266 update_buffer(
267 buffers[1].channel_id,
268 user_id,
269 db,
270 vec![
271 text_buffers[1].edit([(0..0, "d")]),
272 text_buffers[1].edit([(1..1, "e")]),
273 text_buffers[1].edit([(2..2, "f")]),
274 ],
275 )
276 .await;
277
278 // cause buffer 1's epoch to increment.
279 db.leave_channel_buffer(buffers[1].channel_id, connection_id)
280 .await
281 .unwrap();
282 db.join_channel_buffer(buffers[1].channel_id, user_id, connection_id)
283 .await
284 .unwrap();
285 let replica_id = text_buffers[1].replica_id();
286 text_buffers[1] = Buffer::new(
287 replica_id,
288 text::BufferId::new(1).unwrap(),
289 "def".to_string(),
290 );
291 update_buffer(
292 buffers[1].channel_id,
293 user_id,
294 db,
295 vec![
296 text_buffers[1].edit([(0..0, "g")]),
297 text_buffers[1].edit([(0..0, "h")]),
298 ],
299 )
300 .await;
301
302 update_buffer(
303 buffers[2].channel_id,
304 user_id,
305 db,
306 vec![text_buffers[2].edit([(0..0, "i")])],
307 )
308 .await;
309
310 let channels_for_user = db.get_channels_for_user(user_id).await.unwrap();
311
312 pretty_assertions::assert_eq!(
313 channels_for_user.latest_buffer_versions,
314 [
315 rpc::proto::ChannelBufferVersion {
316 channel_id: buffers[0].channel_id.to_proto(),
317 epoch: 0,
318 version: serialize_version(&text_buffers[0].version())
319 .into_iter()
320 .filter(
321 |vector| vector.replica_id == text_buffers[0].replica_id().as_u16() as u32
322 )
323 .collect::<Vec<_>>(),
324 },
325 rpc::proto::ChannelBufferVersion {
326 channel_id: buffers[1].channel_id.to_proto(),
327 epoch: 1,
328 version: serialize_version(&text_buffers[1].version())
329 .into_iter()
330 .filter(
331 |vector| vector.replica_id == text_buffers[1].replica_id().as_u16() as u32
332 )
333 .collect::<Vec<_>>(),
334 },
335 rpc::proto::ChannelBufferVersion {
336 channel_id: buffers[2].channel_id.to_proto(),
337 epoch: 0,
338 version: serialize_version(&text_buffers[2].version())
339 .into_iter()
340 .filter(
341 |vector| vector.replica_id == text_buffers[2].replica_id().as_u16() as u32
342 )
343 .collect::<Vec<_>>(),
344 },
345 ]
346 );
347}
348
349async fn update_buffer(
350 channel_id: ChannelId,
351 user_id: UserId,
352 db: &Database,
353 operations: Vec<text::Operation>,
354) {
355 let operations = operations
356 .into_iter()
357 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
358 .collect::<Vec<_>>();
359 db.update_channel_buffer(channel_id, user_id, &operations)
360 .await
361 .unwrap();
362}