1use crate::{rpc::RECONNECT_TIMEOUT, tests::TestServer};
2use channel::{ChannelChat, ChannelMessageId, MessageParams};
3use collab_ui::chat_panel::ChatPanel;
4use gpui::{executor::Deterministic, BorrowAppContext, ModelHandle, TestAppContext};
5use rpc::Notification;
6use std::sync::Arc;
7use workspace::dock::Panel;
8
9#[gpui::test]
10async fn test_basic_channel_messages(
11 deterministic: Arc<Deterministic>,
12 mut cx_a: &mut TestAppContext,
13 mut cx_b: &mut TestAppContext,
14 mut cx_c: &mut TestAppContext,
15) {
16 deterministic.forbid_parking();
17 let mut server = TestServer::start(&deterministic).await;
18 let client_a = server.create_client(cx_a, "user_a").await;
19 let client_b = server.create_client(cx_b, "user_b").await;
20 let client_c = server.create_client(cx_c, "user_c").await;
21
22 let channel_id = server
23 .make_channel(
24 "the-channel",
25 None,
26 (&client_a, cx_a),
27 &mut [(&client_b, cx_b), (&client_c, cx_c)],
28 )
29 .await;
30
31 let channel_chat_a = client_a
32 .channel_store()
33 .update(cx_a, |store, cx| store.open_channel_chat(channel_id, cx))
34 .await
35 .unwrap();
36 let channel_chat_b = client_b
37 .channel_store()
38 .update(cx_b, |store, cx| store.open_channel_chat(channel_id, cx))
39 .await
40 .unwrap();
41
42 let message_id = channel_chat_a
43 .update(cx_a, |c, cx| {
44 c.send_message(
45 MessageParams {
46 text: "hi @user_c!".into(),
47 mentions: vec![(3..10, client_c.id())],
48 },
49 cx,
50 )
51 .unwrap()
52 })
53 .await
54 .unwrap();
55 channel_chat_a
56 .update(cx_a, |c, cx| c.send_message("two".into(), cx).unwrap())
57 .await
58 .unwrap();
59
60 deterministic.run_until_parked();
61 channel_chat_b
62 .update(cx_b, |c, cx| c.send_message("three".into(), cx).unwrap())
63 .await
64 .unwrap();
65
66 deterministic.run_until_parked();
67
68 let channel_chat_c = client_c
69 .channel_store()
70 .update(cx_c, |store, cx| store.open_channel_chat(channel_id, cx))
71 .await
72 .unwrap();
73
74 for (chat, cx) in [
75 (&channel_chat_a, &mut cx_a),
76 (&channel_chat_b, &mut cx_b),
77 (&channel_chat_c, &mut cx_c),
78 ] {
79 chat.update(*cx, |c, _| {
80 assert_eq!(
81 c.messages()
82 .iter()
83 .map(|m| (m.body.as_str(), m.mentions.as_slice()))
84 .collect::<Vec<_>>(),
85 vec![
86 ("hi @user_c!", [(3..10, client_c.id())].as_slice()),
87 ("two", &[]),
88 ("three", &[])
89 ],
90 "results for user {}",
91 c.client().id(),
92 );
93 });
94 }
95
96 client_c.notification_store().update(cx_c, |store, _| {
97 assert_eq!(store.notification_count(), 2);
98 assert_eq!(store.unread_notification_count(), 1);
99 assert_eq!(
100 store.notification_at(0).unwrap().notification,
101 Notification::ChannelMessageMention {
102 message_id,
103 sender_id: client_a.id(),
104 channel_id,
105 }
106 );
107 assert_eq!(
108 store.notification_at(1).unwrap().notification,
109 Notification::ChannelInvitation {
110 channel_id,
111 channel_name: "the-channel".to_string(),
112 inviter_id: client_a.id()
113 }
114 );
115 });
116}
117
118#[gpui::test]
119async fn test_rejoin_channel_chat(
120 deterministic: Arc<Deterministic>,
121 cx_a: &mut TestAppContext,
122 cx_b: &mut TestAppContext,
123) {
124 deterministic.forbid_parking();
125 let mut server = TestServer::start(&deterministic).await;
126 let client_a = server.create_client(cx_a, "user_a").await;
127 let client_b = server.create_client(cx_b, "user_b").await;
128
129 let channel_id = server
130 .make_channel(
131 "the-channel",
132 None,
133 (&client_a, cx_a),
134 &mut [(&client_b, cx_b)],
135 )
136 .await;
137
138 let channel_chat_a = client_a
139 .channel_store()
140 .update(cx_a, |store, cx| store.open_channel_chat(channel_id, cx))
141 .await
142 .unwrap();
143 let channel_chat_b = client_b
144 .channel_store()
145 .update(cx_b, |store, cx| store.open_channel_chat(channel_id, cx))
146 .await
147 .unwrap();
148
149 channel_chat_a
150 .update(cx_a, |c, cx| c.send_message("one".into(), cx).unwrap())
151 .await
152 .unwrap();
153 channel_chat_b
154 .update(cx_b, |c, cx| c.send_message("two".into(), cx).unwrap())
155 .await
156 .unwrap();
157
158 server.forbid_connections();
159 server.disconnect_client(client_a.peer_id().unwrap());
160
161 // While client A is disconnected, clients A and B both send new messages.
162 channel_chat_a
163 .update(cx_a, |c, cx| c.send_message("three".into(), cx).unwrap())
164 .await
165 .unwrap_err();
166 channel_chat_a
167 .update(cx_a, |c, cx| c.send_message("four".into(), cx).unwrap())
168 .await
169 .unwrap_err();
170 channel_chat_b
171 .update(cx_b, |c, cx| c.send_message("five".into(), cx).unwrap())
172 .await
173 .unwrap();
174 channel_chat_b
175 .update(cx_b, |c, cx| c.send_message("six".into(), cx).unwrap())
176 .await
177 .unwrap();
178
179 // Client A reconnects.
180 server.allow_connections();
181 deterministic.advance_clock(RECONNECT_TIMEOUT);
182
183 // Client A fetches the messages that were sent while they were disconnected
184 // and resends their own messages which failed to send.
185 let expected_messages = &["one", "two", "five", "six", "three", "four"];
186 assert_messages(&channel_chat_a, expected_messages, cx_a);
187 assert_messages(&channel_chat_b, expected_messages, cx_b);
188}
189
190#[gpui::test]
191async fn test_remove_channel_message(
192 deterministic: Arc<Deterministic>,
193 cx_a: &mut TestAppContext,
194 cx_b: &mut TestAppContext,
195 cx_c: &mut TestAppContext,
196) {
197 deterministic.forbid_parking();
198 let mut server = TestServer::start(&deterministic).await;
199 let client_a = server.create_client(cx_a, "user_a").await;
200 let client_b = server.create_client(cx_b, "user_b").await;
201 let client_c = server.create_client(cx_c, "user_c").await;
202
203 let channel_id = server
204 .make_channel(
205 "the-channel",
206 None,
207 (&client_a, cx_a),
208 &mut [(&client_b, cx_b), (&client_c, cx_c)],
209 )
210 .await;
211
212 let channel_chat_a = client_a
213 .channel_store()
214 .update(cx_a, |store, cx| store.open_channel_chat(channel_id, cx))
215 .await
216 .unwrap();
217 let channel_chat_b = client_b
218 .channel_store()
219 .update(cx_b, |store, cx| store.open_channel_chat(channel_id, cx))
220 .await
221 .unwrap();
222
223 // Client A sends some messages.
224 channel_chat_a
225 .update(cx_a, |c, cx| c.send_message("one".into(), cx).unwrap())
226 .await
227 .unwrap();
228 channel_chat_a
229 .update(cx_a, |c, cx| c.send_message("two".into(), cx).unwrap())
230 .await
231 .unwrap();
232 channel_chat_a
233 .update(cx_a, |c, cx| c.send_message("three".into(), cx).unwrap())
234 .await
235 .unwrap();
236
237 // Clients A and B see all of the messages.
238 deterministic.run_until_parked();
239 let expected_messages = &["one", "two", "three"];
240 assert_messages(&channel_chat_a, expected_messages, cx_a);
241 assert_messages(&channel_chat_b, expected_messages, cx_b);
242
243 // Client A deletes one of their messages.
244 channel_chat_a
245 .update(cx_a, |c, cx| {
246 let ChannelMessageId::Saved(id) = c.message(1).id else {
247 panic!("message not saved")
248 };
249 c.remove_message(id, cx)
250 })
251 .await
252 .unwrap();
253
254 // Client B sees that the message is gone.
255 deterministic.run_until_parked();
256 let expected_messages = &["one", "three"];
257 assert_messages(&channel_chat_a, expected_messages, cx_a);
258 assert_messages(&channel_chat_b, expected_messages, cx_b);
259
260 // Client C joins the channel chat, and does not see the deleted message.
261 let channel_chat_c = client_c
262 .channel_store()
263 .update(cx_c, |store, cx| store.open_channel_chat(channel_id, cx))
264 .await
265 .unwrap();
266 assert_messages(&channel_chat_c, expected_messages, cx_c);
267}
268
269#[track_caller]
270fn assert_messages(chat: &ModelHandle<ChannelChat>, messages: &[&str], cx: &mut TestAppContext) {
271 assert_eq!(
272 chat.read_with(cx, |chat, _| chat
273 .messages()
274 .iter()
275 .map(|m| m.body.clone())
276 .collect::<Vec<_>>(),),
277 messages
278 );
279}
280
281#[gpui::test]
282async fn test_channel_message_changes(
283 deterministic: Arc<Deterministic>,
284 cx_a: &mut TestAppContext,
285 cx_b: &mut TestAppContext,
286) {
287 deterministic.forbid_parking();
288 let mut server = TestServer::start(&deterministic).await;
289 let client_a = server.create_client(cx_a, "user_a").await;
290 let client_b = server.create_client(cx_b, "user_b").await;
291
292 let channel_id = server
293 .make_channel(
294 "the-channel",
295 None,
296 (&client_a, cx_a),
297 &mut [(&client_b, cx_b)],
298 )
299 .await;
300
301 // Client A sends a message, client B should see that there is a new message.
302 let channel_chat_a = client_a
303 .channel_store()
304 .update(cx_a, |store, cx| store.open_channel_chat(channel_id, cx))
305 .await
306 .unwrap();
307
308 channel_chat_a
309 .update(cx_a, |c, cx| c.send_message("one".into(), cx).unwrap())
310 .await
311 .unwrap();
312
313 deterministic.run_until_parked();
314
315 let b_has_messages = cx_b.read_with(|cx| {
316 client_b
317 .channel_store()
318 .read(cx)
319 .has_new_messages(channel_id)
320 .unwrap()
321 });
322
323 assert!(b_has_messages);
324
325 // Opening the chat should clear the changed flag.
326 cx_b.update(|cx| {
327 collab_ui::init(&client_b.app_state, cx);
328 });
329 let project_b = client_b.build_empty_local_project(cx_b);
330 let workspace_b = client_b.build_workspace(&project_b, cx_b).root(cx_b);
331 let chat_panel_b = workspace_b.update(cx_b, |workspace, cx| ChatPanel::new(workspace, cx));
332 chat_panel_b
333 .update(cx_b, |chat_panel, cx| {
334 chat_panel.set_active(true, cx);
335 chat_panel.select_channel(channel_id, None, cx)
336 })
337 .await
338 .unwrap();
339
340 deterministic.run_until_parked();
341
342 let b_has_messages = cx_b.read_with(|cx| {
343 client_b
344 .channel_store()
345 .read(cx)
346 .has_new_messages(channel_id)
347 .unwrap()
348 });
349
350 assert!(!b_has_messages);
351
352 // Sending a message while the chat is open should not change the flag.
353 channel_chat_a
354 .update(cx_a, |c, cx| c.send_message("two".into(), cx).unwrap())
355 .await
356 .unwrap();
357
358 deterministic.run_until_parked();
359
360 let b_has_messages = cx_b.read_with(|cx| {
361 client_b
362 .channel_store()
363 .read(cx)
364 .has_new_messages(channel_id)
365 .unwrap()
366 });
367
368 assert!(!b_has_messages);
369
370 // Sending a message while the chat is closed should change the flag.
371 chat_panel_b.update(cx_b, |chat_panel, cx| {
372 chat_panel.set_active(false, cx);
373 });
374
375 // Sending a message while the chat is open should not change the flag.
376 channel_chat_a
377 .update(cx_a, |c, cx| c.send_message("three".into(), cx).unwrap())
378 .await
379 .unwrap();
380
381 deterministic.run_until_parked();
382
383 let b_has_messages = cx_b.read_with(|cx| {
384 client_b
385 .channel_store()
386 .read(cx)
387 .has_new_messages(channel_id)
388 .unwrap()
389 });
390
391 assert!(b_has_messages);
392
393 // Closing the chat should re-enable change tracking
394 cx_b.update(|_| drop(chat_panel_b));
395
396 channel_chat_a
397 .update(cx_a, |c, cx| c.send_message("four".into(), cx).unwrap())
398 .await
399 .unwrap();
400
401 deterministic.run_until_parked();
402
403 let b_has_messages = cx_b.read_with(|cx| {
404 client_b
405 .channel_store()
406 .read(cx)
407 .has_new_messages(channel_id)
408 .unwrap()
409 });
410
411 assert!(b_has_messages);
412}