1use super::*;
2use crate::test_both_dbs;
3use language::proto::{self, serialize_version};
4use text::Buffer;
5
6test_both_dbs!(
7 test_channel_buffers,
8 test_channel_buffers_postgres,
9 test_channel_buffers_sqlite
10);
11
12async fn test_channel_buffers(db: &Arc<Database>) {
13 let a_id = db
14 .create_user(
15 "user_a@example.com",
16 false,
17 NewUserParams {
18 github_login: "user_a".into(),
19 github_user_id: 101,
20 },
21 )
22 .await
23 .unwrap()
24 .user_id;
25 let b_id = db
26 .create_user(
27 "user_b@example.com",
28 false,
29 NewUserParams {
30 github_login: "user_b".into(),
31 github_user_id: 102,
32 },
33 )
34 .await
35 .unwrap()
36 .user_id;
37
38 // This user will not be a part of the channel
39 let c_id = db
40 .create_user(
41 "user_c@example.com",
42 false,
43 NewUserParams {
44 github_login: "user_c".into(),
45 github_user_id: 103,
46 },
47 )
48 .await
49 .unwrap()
50 .user_id;
51
52 let owner_id = db.create_server("production").await.unwrap().0 as u32;
53
54 let zed_id = db.create_root_channel("zed", a_id).await.unwrap();
55
56 db.invite_channel_member(zed_id, b_id, a_id, ChannelRole::Member)
57 .await
58 .unwrap();
59
60 db.respond_to_channel_invite(zed_id, b_id, true)
61 .await
62 .unwrap();
63
64 let connection_id_a = ConnectionId { owner_id, id: 1 };
65 let _ = db
66 .join_channel_buffer(zed_id, a_id, connection_id_a)
67 .await
68 .unwrap();
69
70 let mut buffer_a = Buffer::new(0, text::BufferId::new(1).unwrap(), "".to_string());
71 let operations = vec![
72 buffer_a.edit([(0..0, "hello world")]),
73 buffer_a.edit([(5..5, ", cruel")]),
74 buffer_a.edit([(0..5, "goodbye")]),
75 buffer_a.undo().unwrap().1,
76 ];
77 assert_eq!(buffer_a.text(), "hello, cruel world");
78
79 let operations = operations
80 .into_iter()
81 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
82 .collect::<Vec<_>>();
83
84 db.update_channel_buffer(zed_id, a_id, &operations)
85 .await
86 .unwrap();
87
88 let connection_id_b = ConnectionId { owner_id, id: 2 };
89 let buffer_response_b = db
90 .join_channel_buffer(zed_id, b_id, connection_id_b)
91 .await
92 .unwrap();
93
94 let mut buffer_b = Buffer::new(
95 0,
96 text::BufferId::new(1).unwrap(),
97 buffer_response_b.base_text,
98 );
99 buffer_b.apply_ops(buffer_response_b.operations.into_iter().map(|operation| {
100 let operation = proto::deserialize_operation(operation).unwrap();
101 if let language::Operation::Buffer(operation) = operation {
102 operation
103 } else {
104 unreachable!()
105 }
106 }));
107
108 assert_eq!(buffer_b.text(), "hello, cruel world");
109
110 // Ensure that C fails to open the buffer
111 assert!(db
112 .join_channel_buffer(zed_id, c_id, ConnectionId { owner_id, id: 3 })
113 .await
114 .is_err());
115
116 // Ensure that both collaborators have shown up
117 assert_eq!(
118 buffer_response_b.collaborators,
119 &[
120 rpc::proto::Collaborator {
121 user_id: a_id.to_proto(),
122 peer_id: Some(rpc::proto::PeerId { id: 1, owner_id }),
123 replica_id: 0,
124 is_host: false,
125 },
126 rpc::proto::Collaborator {
127 user_id: b_id.to_proto(),
128 peer_id: Some(rpc::proto::PeerId { id: 2, owner_id }),
129 replica_id: 1,
130 is_host: false,
131 }
132 ]
133 );
134
135 // Ensure that get_channel_buffer_collaborators works
136 let zed_collaborats = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
137 assert_eq!(zed_collaborats, &[a_id, b_id]);
138
139 let left_buffer = db
140 .leave_channel_buffer(zed_id, connection_id_b)
141 .await
142 .unwrap();
143
144 assert_eq!(left_buffer.connections, &[connection_id_a],);
145
146 let cargo_id = db.create_root_channel("cargo", a_id).await.unwrap();
147 let _ = db
148 .join_channel_buffer(cargo_id, a_id, connection_id_a)
149 .await
150 .unwrap();
151
152 db.leave_channel_buffers(connection_id_a).await.unwrap();
153
154 let zed_collaborators = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
155 let cargo_collaborators = db.get_channel_buffer_collaborators(cargo_id).await.unwrap();
156 assert_eq!(zed_collaborators, &[]);
157 assert_eq!(cargo_collaborators, &[]);
158
159 // When everyone has left the channel, the operations are collapsed into
160 // a new base text.
161 let buffer_response_b = db
162 .join_channel_buffer(zed_id, b_id, connection_id_b)
163 .await
164 .unwrap();
165 assert_eq!(buffer_response_b.base_text, "hello, cruel world");
166 assert_eq!(buffer_response_b.operations, &[]);
167}
168
169test_both_dbs!(
170 test_channel_buffers_last_operations,
171 test_channel_buffers_last_operations_postgres,
172 test_channel_buffers_last_operations_sqlite
173);
174
175async fn test_channel_buffers_last_operations(db: &Database) {
176 let user_id = db
177 .create_user(
178 "user_a@example.com",
179 false,
180 NewUserParams {
181 github_login: "user_a".into(),
182 github_user_id: 101,
183 },
184 )
185 .await
186 .unwrap()
187 .user_id;
188 let observer_id = db
189 .create_user(
190 "user_b@example.com",
191 false,
192 NewUserParams {
193 github_login: "user_b".into(),
194 github_user_id: 102,
195 },
196 )
197 .await
198 .unwrap()
199 .user_id;
200 let owner_id = db.create_server("production").await.unwrap().0 as u32;
201 let connection_id = ConnectionId {
202 owner_id,
203 id: user_id.0 as u32,
204 };
205
206 let mut buffers = Vec::new();
207 let mut text_buffers = Vec::new();
208 for i in 0..3 {
209 let channel = db
210 .create_root_channel(&format!("channel-{i}"), user_id)
211 .await
212 .unwrap();
213
214 db.invite_channel_member(channel, observer_id, user_id, ChannelRole::Member)
215 .await
216 .unwrap();
217 db.respond_to_channel_invite(channel, observer_id, true)
218 .await
219 .unwrap();
220
221 db.join_channel_buffer(channel, user_id, connection_id)
222 .await
223 .unwrap();
224
225 buffers.push(
226 db.transaction(|tx| async move { db.get_channel_buffer(channel, &tx).await })
227 .await
228 .unwrap(),
229 );
230
231 text_buffers.push(Buffer::new(
232 0,
233 text::BufferId::new(1).unwrap(),
234 "".to_string(),
235 ));
236 }
237
238 update_buffer(
239 buffers[0].channel_id,
240 user_id,
241 db,
242 vec![
243 text_buffers[0].edit([(0..0, "a")]),
244 text_buffers[0].edit([(0..0, "b")]),
245 text_buffers[0].edit([(0..0, "c")]),
246 ],
247 )
248 .await;
249
250 update_buffer(
251 buffers[1].channel_id,
252 user_id,
253 db,
254 vec![
255 text_buffers[1].edit([(0..0, "d")]),
256 text_buffers[1].edit([(1..1, "e")]),
257 text_buffers[1].edit([(2..2, "f")]),
258 ],
259 )
260 .await;
261
262 // cause buffer 1's epoch to increment.
263 db.leave_channel_buffer(buffers[1].channel_id, connection_id)
264 .await
265 .unwrap();
266 db.join_channel_buffer(buffers[1].channel_id, user_id, connection_id)
267 .await
268 .unwrap();
269 text_buffers[1] = Buffer::new(1, text::BufferId::new(1).unwrap(), "def".to_string());
270 update_buffer(
271 buffers[1].channel_id,
272 user_id,
273 db,
274 vec![
275 text_buffers[1].edit([(0..0, "g")]),
276 text_buffers[1].edit([(0..0, "h")]),
277 ],
278 )
279 .await;
280
281 update_buffer(
282 buffers[2].channel_id,
283 user_id,
284 db,
285 vec![text_buffers[2].edit([(0..0, "i")])],
286 )
287 .await;
288
289 let channels_for_user = db.get_channels_for_user(user_id).await.unwrap();
290
291 pretty_assertions::assert_eq!(
292 channels_for_user.latest_buffer_versions,
293 [
294 rpc::proto::ChannelBufferVersion {
295 channel_id: buffers[0].channel_id.to_proto(),
296 epoch: 0,
297 version: serialize_version(&text_buffers[0].version()),
298 },
299 rpc::proto::ChannelBufferVersion {
300 channel_id: buffers[1].channel_id.to_proto(),
301 epoch: 1,
302 version: serialize_version(&text_buffers[1].version())
303 .into_iter()
304 .filter(|vector| vector.replica_id == text_buffers[1].replica_id() as u32)
305 .collect::<Vec<_>>(),
306 },
307 rpc::proto::ChannelBufferVersion {
308 channel_id: buffers[2].channel_id.to_proto(),
309 epoch: 0,
310 version: serialize_version(&text_buffers[2].version()),
311 },
312 ]
313 );
314}
315
316async fn update_buffer(
317 channel_id: ChannelId,
318 user_id: UserId,
319 db: &Database,
320 operations: Vec<text::Operation>,
321) {
322 let operations = operations
323 .into_iter()
324 .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
325 .collect::<Vec<_>>();
326 db.update_channel_buffer(channel_id, user_id, &operations)
327 .await
328 .unwrap();
329}