1use super::{run_randomized_test, RandomizedTest, TestClient, TestError, TestServer, UserTestPlan};
2use anyhow::Result;
3use async_trait::async_trait;
4use gpui::{executor::Deterministic, TestAppContext};
5use rand::prelude::*;
6use serde_derive::{Deserialize, Serialize};
7use std::{ops::Range, rc::Rc, sync::Arc};
8use text::Bias;
9
10#[gpui::test(
11 iterations = 100,
12 on_failure = "crate::tests::save_randomized_test_plan"
13)]
14async fn test_random_channel_buffers(
15 cx: &mut TestAppContext,
16 deterministic: Arc<Deterministic>,
17 rng: StdRng,
18) {
19 run_randomized_test::<RandomChannelBufferTest>(cx, deterministic, rng).await;
20}
21
22struct RandomChannelBufferTest;
23
24#[derive(Clone, Serialize, Deserialize)]
25enum ChannelBufferOperation {
26 JoinChannelNotes {
27 channel_name: String,
28 },
29 LeaveChannelNotes {
30 channel_name: String,
31 },
32 EditChannelNotes {
33 channel_name: String,
34 edits: Vec<(Range<usize>, Arc<str>)>,
35 },
36 Noop,
37}
38
39const CHANNEL_COUNT: usize = 3;
40
41#[async_trait(?Send)]
42impl RandomizedTest for RandomChannelBufferTest {
43 type Operation = ChannelBufferOperation;
44
45 async fn initialize(server: &mut TestServer, users: &[UserTestPlan]) {
46 let db = &server.app_state.db;
47 for ix in 0..CHANNEL_COUNT {
48 let id = db
49 .create_channel(
50 &format!("channel-{ix}"),
51 None,
52 &format!("livekit-room-{ix}"),
53 users[0].user_id,
54 )
55 .await
56 .unwrap();
57 for user in &users[1..] {
58 db.invite_channel_member(id, user.user_id, users[0].user_id, false)
59 .await
60 .unwrap();
61 db.respond_to_channel_invite(id, user.user_id, true)
62 .await
63 .unwrap();
64 }
65 }
66 }
67
68 fn generate_operation(
69 client: &TestClient,
70 rng: &mut StdRng,
71 _: &mut UserTestPlan,
72 cx: &TestAppContext,
73 ) -> ChannelBufferOperation {
74 let channel_store = client.channel_store().clone();
75 let channel_buffers = client.channel_buffers();
76
77 // When signed out, we can't do anything unless a channel buffer is
78 // already open.
79 if channel_buffers.is_empty()
80 && channel_store.read_with(cx, |store, _| store.channel_count() == 0)
81 {
82 return ChannelBufferOperation::Noop;
83 }
84
85 loop {
86 match rng.gen_range(0..100_u32) {
87 0..=29 => {
88 let channel_name = client.channel_store().read_with(cx, |store, cx| {
89 store.channel_dag_entries().find_map(|(_, channel)| {
90 if store.has_open_channel_buffer(channel.id, cx) {
91 None
92 } else {
93 Some(channel.name.clone())
94 }
95 })
96 });
97 if let Some(channel_name) = channel_name {
98 break ChannelBufferOperation::JoinChannelNotes { channel_name };
99 }
100 }
101
102 30..=40 => {
103 if let Some(buffer) = channel_buffers.iter().choose(rng) {
104 let channel_name = buffer.read_with(cx, |b, _| b.channel().name.clone());
105 break ChannelBufferOperation::LeaveChannelNotes { channel_name };
106 }
107 }
108
109 _ => {
110 if let Some(buffer) = channel_buffers.iter().choose(rng) {
111 break buffer.read_with(cx, |b, _| {
112 let channel_name = b.channel().name.clone();
113 let edits = b
114 .buffer()
115 .read_with(cx, |buffer, _| buffer.get_random_edits(rng, 3));
116 ChannelBufferOperation::EditChannelNotes {
117 channel_name,
118 edits,
119 }
120 });
121 }
122 }
123 }
124 }
125 }
126
127 async fn apply_operation(
128 client: &TestClient,
129 operation: ChannelBufferOperation,
130 cx: &mut TestAppContext,
131 ) -> Result<(), TestError> {
132 match operation {
133 ChannelBufferOperation::JoinChannelNotes { channel_name } => {
134 let buffer = client.channel_store().update(cx, |store, cx| {
135 let channel_id = store
136 .channel_dag_entries()
137 .find(|(_, c)| c.name == channel_name)
138 .unwrap()
139 .1
140 .id;
141 if store.has_open_channel_buffer(channel_id, cx) {
142 Err(TestError::Inapplicable)
143 } else {
144 Ok(store.open_channel_buffer(channel_id, cx))
145 }
146 })?;
147
148 log::info!(
149 "{}: opening notes for channel {channel_name}",
150 client.username
151 );
152 client.channel_buffers().insert(buffer.await?);
153 }
154
155 ChannelBufferOperation::LeaveChannelNotes { channel_name } => {
156 let buffer = cx.update(|cx| {
157 let mut left_buffer = Err(TestError::Inapplicable);
158 client.channel_buffers().retain(|buffer| {
159 if buffer.read(cx).channel().name == channel_name {
160 left_buffer = Ok(buffer.clone());
161 false
162 } else {
163 true
164 }
165 });
166 left_buffer
167 })?;
168
169 log::info!(
170 "{}: closing notes for channel {channel_name}",
171 client.username
172 );
173 cx.update(|_| drop(buffer));
174 }
175
176 ChannelBufferOperation::EditChannelNotes {
177 channel_name,
178 edits,
179 } => {
180 let channel_buffer = cx
181 .read(|cx| {
182 client
183 .channel_buffers()
184 .iter()
185 .find(|buffer| buffer.read(cx).channel().name == channel_name)
186 .cloned()
187 })
188 .ok_or_else(|| TestError::Inapplicable)?;
189
190 log::info!(
191 "{}: editing notes for channel {channel_name} with {:?}",
192 client.username,
193 edits
194 );
195
196 channel_buffer.update(cx, |buffer, cx| {
197 let buffer = buffer.buffer();
198 buffer.update(cx, |buffer, cx| {
199 let snapshot = buffer.snapshot();
200 buffer.edit(
201 edits.into_iter().map(|(range, text)| {
202 let start = snapshot.clip_offset(range.start, Bias::Left);
203 let end = snapshot.clip_offset(range.end, Bias::Right);
204 (start..end, text)
205 }),
206 None,
207 cx,
208 );
209 });
210 });
211 }
212
213 ChannelBufferOperation::Noop => Err(TestError::Inapplicable)?,
214 }
215 Ok(())
216 }
217
218 async fn on_client_added(client: &Rc<TestClient>, cx: &mut TestAppContext) {
219 let channel_store = client.channel_store();
220 while channel_store.read_with(cx, |store, _| store.channel_count() == 0) {
221 channel_store.next_notification(cx).await;
222 }
223 }
224
225 async fn on_quiesce(server: &mut TestServer, clients: &mut [(Rc<TestClient>, TestAppContext)]) {
226 let channels = server.app_state.db.all_channels().await.unwrap();
227
228 for (client, client_cx) in clients.iter_mut() {
229 client_cx.update(|cx| {
230 client
231 .channel_buffers()
232 .retain(|b| b.read(cx).is_connected());
233 });
234 }
235
236 for (channel_id, channel_name) in channels {
237 let mut prev_text: Option<(u64, String)> = None;
238
239 let mut collaborator_user_ids = server
240 .app_state
241 .db
242 .get_channel_buffer_collaborators(channel_id)
243 .await
244 .unwrap()
245 .into_iter()
246 .map(|id| id.to_proto())
247 .collect::<Vec<_>>();
248 collaborator_user_ids.sort();
249
250 for (client, client_cx) in clients.iter() {
251 let user_id = client.user_id().unwrap();
252 client_cx.read(|cx| {
253 if let Some(channel_buffer) = client
254 .channel_buffers()
255 .iter()
256 .find(|b| b.read(cx).channel().id == channel_id.to_proto())
257 {
258 let channel_buffer = channel_buffer.read(cx);
259
260 // Assert that channel buffer's text matches other clients' copies.
261 let text = channel_buffer.buffer().read(cx).text();
262 if let Some((prev_user_id, prev_text)) = &prev_text {
263 assert_eq!(
264 &text,
265 prev_text,
266 "client {user_id} has different text than client {prev_user_id} for channel {channel_name}",
267 );
268 } else {
269 prev_text = Some((user_id, text.clone()));
270 }
271
272 // Assert that all clients and the server agree about who is present in the
273 // channel buffer.
274 let collaborators = channel_buffer.collaborators();
275 let mut user_ids =
276 collaborators.iter().map(|c| c.user_id).collect::<Vec<_>>();
277 user_ids.sort();
278 assert_eq!(
279 user_ids,
280 collaborator_user_ids,
281 "client {user_id} has different user ids for channel {channel_name} than the server",
282 );
283 }
284 });
285 }
286 }
287 }
288}