1use super::*;
2use crate::test_both_dbs;
3use language::proto::{self, serialize_version};
4use text::Buffer;
5
6test_both_dbs!(
7 test_channel_buffers,
8 test_channel_buffers_postgres,
9 test_channel_buffers_sqlite
10);
11
12async fn test_channel_buffers(db: &Arc<Database>) {
13 let a_id = db
14 .create_user(
15 "user_a@example.com",
16 None,
17 false,
18 NewUserParams {
19 github_login: "user_a".into(),
20 github_user_id: 101,
21 },
22 )
23 .await
24 .unwrap()
25 .user_id;
26 let b_id = db
27 .create_user(
28 "user_b@example.com",
29 None,
30 false,
31 NewUserParams {
32 github_login: "user_b".into(),
33 github_user_id: 102,
34 },
35 )
36 .await
37 .unwrap()
38 .user_id;
39
40 // This user will not be a part of the channel
41 let c_id = db
42 .create_user(
43 "user_c@example.com",
44 None,
45 false,
46 NewUserParams {
47 github_login: "user_c".into(),
48 github_user_id: 103,
49 },
50 )
51 .await
52 .unwrap()
53 .user_id;
54
55 let owner_id = db.create_server("production").await.unwrap().0 as u32;
56
57 let zed_id = db.create_root_channel("zed", a_id).await.unwrap();
58
59 db.invite_channel_member(zed_id, b_id, a_id, ChannelRole::Member)
60 .await
61 .unwrap();
62
63 db.respond_to_channel_invite(zed_id, b_id, true)
64 .await
65 .unwrap();
66
67 let connection_id_a = ConnectionId { owner_id, id: 1 };
68 let _ = db
69 .join_channel_buffer(zed_id, a_id, connection_id_a)
70 .await
71 .unwrap();
72
73 let mut buffer_a = Buffer::new(0, text::BufferId::new(1).unwrap(), "".to_string());
74 let operations = vec![
75 buffer_a.edit([(0..0, "hello world")]),
76 buffer_a.edit([(5..5, ", cruel")]),
77 buffer_a.edit([(0..5, "goodbye")]),
78 buffer_a.undo().unwrap().1,
79 ];
80 assert_eq!(buffer_a.text(), "hello, cruel world");
81
82 let operations = operations
83 .into_iter()
84 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
85 .collect::<Vec<_>>();
86
87 db.update_channel_buffer(zed_id, a_id, &operations)
88 .await
89 .unwrap();
90
91 let connection_id_b = ConnectionId { owner_id, id: 2 };
92 let buffer_response_b = db
93 .join_channel_buffer(zed_id, b_id, connection_id_b)
94 .await
95 .unwrap();
96
97 let mut buffer_b = Buffer::new(
98 0,
99 text::BufferId::new(1).unwrap(),
100 buffer_response_b.base_text,
101 );
102 buffer_b.apply_ops(buffer_response_b.operations.into_iter().map(|operation| {
103 let operation = proto::deserialize_operation(operation).unwrap();
104 if let language::Operation::Buffer(operation) = operation {
105 operation
106 } else {
107 unreachable!()
108 }
109 }));
110
111 assert_eq!(buffer_b.text(), "hello, cruel world");
112
113 // Ensure that C fails to open the buffer
114 assert!(
115 db.join_channel_buffer(zed_id, c_id, ConnectionId { owner_id, id: 3 })
116 .await
117 .is_err()
118 );
119
120 // Ensure that both collaborators have shown up
121 assert_eq!(
122 buffer_response_b.collaborators,
123 &[
124 rpc::proto::Collaborator {
125 user_id: a_id.to_proto(),
126 peer_id: Some(rpc::proto::PeerId { id: 1, owner_id }),
127 replica_id: 0,
128 is_host: false,
129 },
130 rpc::proto::Collaborator {
131 user_id: b_id.to_proto(),
132 peer_id: Some(rpc::proto::PeerId { id: 2, owner_id }),
133 replica_id: 1,
134 is_host: false,
135 }
136 ]
137 );
138
139 // Ensure that get_channel_buffer_collaborators works
140 let zed_collaborats = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
141 assert_eq!(zed_collaborats, &[a_id, b_id]);
142
143 let left_buffer = db
144 .leave_channel_buffer(zed_id, connection_id_b)
145 .await
146 .unwrap();
147
148 assert_eq!(left_buffer.connections, &[connection_id_a],);
149
150 let cargo_id = db.create_root_channel("cargo", a_id).await.unwrap();
151 let _ = db
152 .join_channel_buffer(cargo_id, a_id, connection_id_a)
153 .await
154 .unwrap();
155
156 db.leave_channel_buffers(connection_id_a).await.unwrap();
157
158 let zed_collaborators = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
159 let cargo_collaborators = db.get_channel_buffer_collaborators(cargo_id).await.unwrap();
160 assert_eq!(zed_collaborators, &[]);
161 assert_eq!(cargo_collaborators, &[]);
162
163 // When everyone has left the channel, the operations are collapsed into
164 // a new base text.
165 let buffer_response_b = db
166 .join_channel_buffer(zed_id, b_id, connection_id_b)
167 .await
168 .unwrap();
169 assert_eq!(buffer_response_b.base_text, "hello, cruel world");
170 assert_eq!(buffer_response_b.operations, &[]);
171}
172
173test_both_dbs!(
174 test_channel_buffers_last_operations,
175 test_channel_buffers_last_operations_postgres,
176 test_channel_buffers_last_operations_sqlite
177);
178
179async fn test_channel_buffers_last_operations(db: &Database) {
180 let user_id = db
181 .create_user(
182 "user_a@example.com",
183 None,
184 false,
185 NewUserParams {
186 github_login: "user_a".into(),
187 github_user_id: 101,
188 },
189 )
190 .await
191 .unwrap()
192 .user_id;
193 let observer_id = db
194 .create_user(
195 "user_b@example.com",
196 None,
197 false,
198 NewUserParams {
199 github_login: "user_b".into(),
200 github_user_id: 102,
201 },
202 )
203 .await
204 .unwrap()
205 .user_id;
206 let owner_id = db.create_server("production").await.unwrap().0 as u32;
207 let connection_id = ConnectionId {
208 owner_id,
209 id: user_id.0 as u32,
210 };
211
212 let mut buffers = Vec::new();
213 let mut text_buffers = Vec::new();
214 for i in 0..3 {
215 let channel = db
216 .create_root_channel(&format!("channel-{i}"), user_id)
217 .await
218 .unwrap();
219
220 db.invite_channel_member(channel, observer_id, user_id, ChannelRole::Member)
221 .await
222 .unwrap();
223 db.respond_to_channel_invite(channel, observer_id, true)
224 .await
225 .unwrap();
226
227 db.join_channel_buffer(channel, user_id, connection_id)
228 .await
229 .unwrap();
230
231 buffers.push(
232 db.transaction(|tx| async move { db.get_channel_buffer(channel, &tx).await })
233 .await
234 .unwrap(),
235 );
236
237 text_buffers.push(Buffer::new(
238 0,
239 text::BufferId::new(1).unwrap(),
240 "".to_string(),
241 ));
242 }
243
244 update_buffer(
245 buffers[0].channel_id,
246 user_id,
247 db,
248 vec![
249 text_buffers[0].edit([(0..0, "a")]),
250 text_buffers[0].edit([(0..0, "b")]),
251 text_buffers[0].edit([(0..0, "c")]),
252 ],
253 )
254 .await;
255
256 update_buffer(
257 buffers[1].channel_id,
258 user_id,
259 db,
260 vec![
261 text_buffers[1].edit([(0..0, "d")]),
262 text_buffers[1].edit([(1..1, "e")]),
263 text_buffers[1].edit([(2..2, "f")]),
264 ],
265 )
266 .await;
267
268 // cause buffer 1's epoch to increment.
269 db.leave_channel_buffer(buffers[1].channel_id, connection_id)
270 .await
271 .unwrap();
272 db.join_channel_buffer(buffers[1].channel_id, user_id, connection_id)
273 .await
274 .unwrap();
275 text_buffers[1] = Buffer::new(1, text::BufferId::new(1).unwrap(), "def".to_string());
276 update_buffer(
277 buffers[1].channel_id,
278 user_id,
279 db,
280 vec![
281 text_buffers[1].edit([(0..0, "g")]),
282 text_buffers[1].edit([(0..0, "h")]),
283 ],
284 )
285 .await;
286
287 update_buffer(
288 buffers[2].channel_id,
289 user_id,
290 db,
291 vec![text_buffers[2].edit([(0..0, "i")])],
292 )
293 .await;
294
295 let channels_for_user = db.get_channels_for_user(user_id).await.unwrap();
296
297 pretty_assertions::assert_eq!(
298 channels_for_user.latest_buffer_versions,
299 [
300 rpc::proto::ChannelBufferVersion {
301 channel_id: buffers[0].channel_id.to_proto(),
302 epoch: 0,
303 version: serialize_version(&text_buffers[0].version()),
304 },
305 rpc::proto::ChannelBufferVersion {
306 channel_id: buffers[1].channel_id.to_proto(),
307 epoch: 1,
308 version: serialize_version(&text_buffers[1].version())
309 .into_iter()
310 .filter(|vector| vector.replica_id == text_buffers[1].replica_id() as u32)
311 .collect::<Vec<_>>(),
312 },
313 rpc::proto::ChannelBufferVersion {
314 channel_id: buffers[2].channel_id.to_proto(),
315 epoch: 0,
316 version: serialize_version(&text_buffers[2].version()),
317 },
318 ]
319 );
320}
321
322async fn update_buffer(
323 channel_id: ChannelId,
324 user_id: UserId,
325 db: &Database,
326 operations: Vec<text::Operation>,
327) {
328 let operations = operations
329 .into_iter()
330 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
331 .collect::<Vec<_>>();
332 db.update_channel_buffer(channel_id, user_id, &operations)
333 .await
334 .unwrap();
335}