1use crate::{Channel, ChannelStore};
2use anyhow::{Context as _, Result};
3use client::{
4 ChannelId, Client, Subscription, TypedEnvelope, UserId, proto,
5 user::{User, UserStore},
6};
7use collections::HashSet;
8use futures::lock::Mutex;
9use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task, WeakEntity};
10use rand::prelude::*;
11use rpc::AnyProtoClient;
12use std::{
13 ops::{ControlFlow, Range},
14 sync::Arc,
15};
16use sum_tree::{Bias, Dimensions, SumTree};
17use time::OffsetDateTime;
18use util::{ResultExt as _, TryFutureExt, post_inc};
19
20pub struct ChannelChat {
21 pub channel_id: ChannelId,
22 messages: SumTree<ChannelMessage>,
23 acknowledged_message_ids: HashSet<u64>,
24 channel_store: Entity<ChannelStore>,
25 loaded_all_messages: bool,
26 last_acknowledged_id: Option<u64>,
27 next_pending_message_id: usize,
28 first_loaded_message_id: Option<u64>,
29 user_store: Entity<UserStore>,
30 rpc: Arc<Client>,
31 outgoing_messages_lock: Arc<Mutex<()>>,
32 rng: StdRng,
33 _subscription: Subscription,
34}
35
36#[derive(Debug, PartialEq, Eq)]
37pub struct MessageParams {
38 pub text: String,
39 pub mentions: Vec<(Range<usize>, UserId)>,
40 pub reply_to_message_id: Option<u64>,
41}
42
43#[derive(Clone, Debug)]
44pub struct ChannelMessage {
45 pub id: ChannelMessageId,
46 pub body: String,
47 pub timestamp: OffsetDateTime,
48 pub sender: Arc<User>,
49 pub nonce: u128,
50 pub mentions: Vec<(Range<usize>, UserId)>,
51 pub reply_to_message_id: Option<u64>,
52 pub edited_at: Option<OffsetDateTime>,
53}
54
55#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
56pub enum ChannelMessageId {
57 Saved(u64),
58 Pending(usize),
59}
60
61impl From<ChannelMessageId> for Option<u64> {
62 fn from(val: ChannelMessageId) -> Self {
63 match val {
64 ChannelMessageId::Saved(id) => Some(id),
65 ChannelMessageId::Pending(_) => None,
66 }
67 }
68}
69
70#[derive(Clone, Debug, Default)]
71pub struct ChannelMessageSummary {
72 max_id: ChannelMessageId,
73 count: usize,
74}
75
76#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
77struct Count(usize);
78
79#[derive(Clone, Debug, PartialEq)]
80pub enum ChannelChatEvent {
81 MessagesUpdated {
82 old_range: Range<usize>,
83 new_count: usize,
84 },
85 UpdateMessage {
86 message_id: ChannelMessageId,
87 message_ix: usize,
88 },
89 NewMessage {
90 channel_id: ChannelId,
91 message_id: u64,
92 },
93}
94
95impl EventEmitter<ChannelChatEvent> for ChannelChat {}
96pub fn init(client: &AnyProtoClient) {
97 client.add_entity_message_handler(ChannelChat::handle_message_sent);
98 client.add_entity_message_handler(ChannelChat::handle_message_removed);
99 client.add_entity_message_handler(ChannelChat::handle_message_updated);
100}
101
102impl ChannelChat {
103 pub async fn new(
104 channel: Arc<Channel>,
105 channel_store: Entity<ChannelStore>,
106 user_store: Entity<UserStore>,
107 client: Arc<Client>,
108 cx: &mut AsyncApp,
109 ) -> Result<Entity<Self>> {
110 let channel_id = channel.id;
111 let subscription = client.subscribe_to_entity(channel_id.0).unwrap();
112
113 let response = client
114 .request(proto::JoinChannelChat {
115 channel_id: channel_id.0,
116 })
117 .await?;
118
119 let handle = cx.new(|cx| {
120 cx.on_release(Self::release).detach();
121 Self {
122 channel_id: channel.id,
123 user_store: user_store.clone(),
124 channel_store,
125 rpc: client.clone(),
126 outgoing_messages_lock: Default::default(),
127 messages: Default::default(),
128 acknowledged_message_ids: Default::default(),
129 loaded_all_messages: false,
130 next_pending_message_id: 0,
131 last_acknowledged_id: None,
132 rng: StdRng::from_os_rng(),
133 first_loaded_message_id: None,
134 _subscription: subscription.set_entity(&cx.entity(), &cx.to_async()),
135 }
136 })?;
137 Self::handle_loaded_messages(
138 handle.downgrade(),
139 user_store,
140 client,
141 response.messages,
142 response.done,
143 cx,
144 )
145 .await?;
146 Ok(handle)
147 }
148
149 fn release(&mut self, _: &mut App) {
150 self.rpc
151 .send(proto::LeaveChannelChat {
152 channel_id: self.channel_id.0,
153 })
154 .log_err();
155 }
156
157 pub fn channel(&self, cx: &App) -> Option<Arc<Channel>> {
158 self.channel_store
159 .read(cx)
160 .channel_for_id(self.channel_id)
161 .cloned()
162 }
163
164 pub fn client(&self) -> &Arc<Client> {
165 &self.rpc
166 }
167
168 pub fn send_message(
169 &mut self,
170 message: MessageParams,
171 cx: &mut Context<Self>,
172 ) -> Result<Task<Result<u64>>> {
173 anyhow::ensure!(
174 !message.text.trim().is_empty(),
175 "message body can't be empty"
176 );
177
178 let current_user = self
179 .user_store
180 .read(cx)
181 .current_user()
182 .context("current_user is not present")?;
183
184 let channel_id = self.channel_id;
185 let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id));
186 let nonce = self.rng.random();
187 self.insert_messages(
188 SumTree::from_item(
189 ChannelMessage {
190 id: pending_id,
191 body: message.text.clone(),
192 sender: current_user,
193 timestamp: OffsetDateTime::now_utc(),
194 mentions: message.mentions.clone(),
195 nonce,
196 reply_to_message_id: message.reply_to_message_id,
197 edited_at: None,
198 },
199 &(),
200 ),
201 cx,
202 );
203 let user_store = self.user_store.clone();
204 let rpc = self.rpc.clone();
205 let outgoing_messages_lock = self.outgoing_messages_lock.clone();
206
207 // todo - handle messages that fail to send (e.g. >1024 chars)
208 Ok(cx.spawn(async move |this, cx| {
209 let outgoing_message_guard = outgoing_messages_lock.lock().await;
210 let request = rpc.request(proto::SendChannelMessage {
211 channel_id: channel_id.0,
212 body: message.text,
213 nonce: Some(nonce.into()),
214 mentions: mentions_to_proto(&message.mentions),
215 reply_to_message_id: message.reply_to_message_id,
216 });
217 let response = request.await?;
218 drop(outgoing_message_guard);
219 let response = response.message.context("invalid message")?;
220 let id = response.id;
221 let message = ChannelMessage::from_proto(response, &user_store, cx).await?;
222 this.update(cx, |this, cx| {
223 this.insert_messages(SumTree::from_item(message, &()), cx);
224 if this.first_loaded_message_id.is_none() {
225 this.first_loaded_message_id = Some(id);
226 }
227 })?;
228 Ok(id)
229 }))
230 }
231
232 pub fn remove_message(&mut self, id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
233 let response = self.rpc.request(proto::RemoveChannelMessage {
234 channel_id: self.channel_id.0,
235 message_id: id,
236 });
237 cx.spawn(async move |this, cx| {
238 response.await?;
239 this.update(cx, |this, cx| {
240 this.message_removed(id, cx);
241 })?;
242 Ok(())
243 })
244 }
245
246 pub fn update_message(
247 &mut self,
248 id: u64,
249 message: MessageParams,
250 cx: &mut Context<Self>,
251 ) -> Result<Task<Result<()>>> {
252 self.message_update(
253 ChannelMessageId::Saved(id),
254 message.text.clone(),
255 message.mentions.clone(),
256 Some(OffsetDateTime::now_utc()),
257 cx,
258 );
259
260 let nonce: u128 = self.rng.random();
261
262 let request = self.rpc.request(proto::UpdateChannelMessage {
263 channel_id: self.channel_id.0,
264 message_id: id,
265 body: message.text,
266 nonce: Some(nonce.into()),
267 mentions: mentions_to_proto(&message.mentions),
268 });
269 Ok(cx.spawn(async move |_, _| {
270 request.await?;
271 Ok(())
272 }))
273 }
274
275 pub fn load_more_messages(&mut self, cx: &mut Context<Self>) -> Option<Task<Option<()>>> {
276 if self.loaded_all_messages {
277 return None;
278 }
279
280 let rpc = self.rpc.clone();
281 let user_store = self.user_store.clone();
282 let channel_id = self.channel_id;
283 let before_message_id = self.first_loaded_message_id()?;
284 Some(cx.spawn(async move |this, cx| {
285 async move {
286 let response = rpc
287 .request(proto::GetChannelMessages {
288 channel_id: channel_id.0,
289 before_message_id,
290 })
291 .await?;
292 Self::handle_loaded_messages(
293 this,
294 user_store,
295 rpc,
296 response.messages,
297 response.done,
298 cx,
299 )
300 .await?;
301
302 anyhow::Ok(())
303 }
304 .log_err()
305 .await
306 }))
307 }
308
309 pub fn first_loaded_message_id(&mut self) -> Option<u64> {
310 self.first_loaded_message_id
311 }
312
313 /// Load a message by its id, if it's already stored locally.
314 pub fn find_loaded_message(&self, id: u64) -> Option<&ChannelMessage> {
315 self.messages.iter().find(|message| match message.id {
316 ChannelMessageId::Saved(message_id) => message_id == id,
317 ChannelMessageId::Pending(_) => false,
318 })
319 }
320
321 /// Load all of the chat messages since a certain message id.
322 ///
323 /// For now, we always maintain a suffix of the channel's messages.
324 pub async fn load_history_since_message(
325 chat: Entity<Self>,
326 message_id: u64,
327 mut cx: AsyncApp,
328 ) -> Option<usize> {
329 loop {
330 let step = chat
331 .update(&mut cx, |chat, cx| {
332 if let Some(first_id) = chat.first_loaded_message_id()
333 && first_id <= message_id
334 {
335 let mut cursor = chat
336 .messages
337 .cursor::<Dimensions<ChannelMessageId, Count>>(&());
338 let message_id = ChannelMessageId::Saved(message_id);
339 cursor.seek(&message_id, Bias::Left);
340 return ControlFlow::Break(
341 if cursor
342 .item()
343 .is_some_and(|message| message.id == message_id)
344 {
345 Some(cursor.start().1.0)
346 } else {
347 None
348 },
349 );
350 }
351 ControlFlow::Continue(chat.load_more_messages(cx))
352 })
353 .log_err()?;
354 match step {
355 ControlFlow::Break(ix) => return ix,
356 ControlFlow::Continue(task) => task?.await?,
357 }
358 }
359 }
360
361 pub fn acknowledge_last_message(&mut self, cx: &mut Context<Self>) {
362 if let ChannelMessageId::Saved(latest_message_id) = self.messages.summary().max_id
363 && self
364 .last_acknowledged_id
365 .is_none_or(|acknowledged_id| acknowledged_id < latest_message_id)
366 {
367 self.rpc
368 .send(proto::AckChannelMessage {
369 channel_id: self.channel_id.0,
370 message_id: latest_message_id,
371 })
372 .ok();
373 self.last_acknowledged_id = Some(latest_message_id);
374 self.channel_store.update(cx, |store, cx| {
375 store.acknowledge_message_id(self.channel_id, latest_message_id, cx);
376 });
377 }
378 }
379
380 async fn handle_loaded_messages(
381 this: WeakEntity<Self>,
382 user_store: Entity<UserStore>,
383 rpc: Arc<Client>,
384 proto_messages: Vec<proto::ChannelMessage>,
385 loaded_all_messages: bool,
386 cx: &mut AsyncApp,
387 ) -> Result<()> {
388 let loaded_messages = messages_from_proto(proto_messages, &user_store, cx).await?;
389
390 let first_loaded_message_id = loaded_messages.first().map(|m| m.id);
391 let loaded_message_ids = this.read_with(cx, |this, _| {
392 let mut loaded_message_ids: HashSet<u64> = HashSet::default();
393 for message in loaded_messages.iter() {
394 if let Some(saved_message_id) = message.id.into() {
395 loaded_message_ids.insert(saved_message_id);
396 }
397 }
398 for message in this.messages.iter() {
399 if let Some(saved_message_id) = message.id.into() {
400 loaded_message_ids.insert(saved_message_id);
401 }
402 }
403 loaded_message_ids
404 })?;
405
406 let missing_ancestors = loaded_messages
407 .iter()
408 .filter_map(|message| {
409 if let Some(ancestor_id) = message.reply_to_message_id
410 && !loaded_message_ids.contains(&ancestor_id)
411 {
412 return Some(ancestor_id);
413 }
414 None
415 })
416 .collect::<Vec<_>>();
417
418 let loaded_ancestors = if missing_ancestors.is_empty() {
419 None
420 } else {
421 let response = rpc
422 .request(proto::GetChannelMessagesById {
423 message_ids: missing_ancestors,
424 })
425 .await?;
426 Some(messages_from_proto(response.messages, &user_store, cx).await?)
427 };
428 this.update(cx, |this, cx| {
429 this.first_loaded_message_id = first_loaded_message_id.and_then(|msg_id| msg_id.into());
430 this.loaded_all_messages = loaded_all_messages;
431 this.insert_messages(loaded_messages, cx);
432 if let Some(loaded_ancestors) = loaded_ancestors {
433 this.insert_messages(loaded_ancestors, cx);
434 }
435 })?;
436
437 Ok(())
438 }
439
440 pub fn rejoin(&mut self, cx: &mut Context<Self>) {
441 let user_store = self.user_store.clone();
442 let rpc = self.rpc.clone();
443 let channel_id = self.channel_id;
444 cx.spawn(async move |this, cx| {
445 async move {
446 let response = rpc
447 .request(proto::JoinChannelChat {
448 channel_id: channel_id.0,
449 })
450 .await?;
451 Self::handle_loaded_messages(
452 this.clone(),
453 user_store.clone(),
454 rpc.clone(),
455 response.messages,
456 response.done,
457 cx,
458 )
459 .await?;
460
461 let pending_messages = this.read_with(cx, |this, _| {
462 this.pending_messages().cloned().collect::<Vec<_>>()
463 })?;
464
465 for pending_message in pending_messages {
466 let request = rpc.request(proto::SendChannelMessage {
467 channel_id: channel_id.0,
468 body: pending_message.body,
469 mentions: mentions_to_proto(&pending_message.mentions),
470 nonce: Some(pending_message.nonce.into()),
471 reply_to_message_id: pending_message.reply_to_message_id,
472 });
473 let response = request.await?;
474 let message = ChannelMessage::from_proto(
475 response.message.context("invalid message")?,
476 &user_store,
477 cx,
478 )
479 .await?;
480 this.update(cx, |this, cx| {
481 this.insert_messages(SumTree::from_item(message, &()), cx);
482 })?;
483 }
484
485 anyhow::Ok(())
486 }
487 .log_err()
488 .await
489 })
490 .detach();
491 }
492
493 pub fn message_count(&self) -> usize {
494 self.messages.summary().count
495 }
496
497 pub fn messages(&self) -> &SumTree<ChannelMessage> {
498 &self.messages
499 }
500
501 pub fn message(&self, ix: usize) -> &ChannelMessage {
502 let mut cursor = self.messages.cursor::<Count>(&());
503 cursor.seek(&Count(ix), Bias::Right);
504 cursor.item().unwrap()
505 }
506
507 pub fn acknowledge_message(&mut self, id: u64) {
508 if self.acknowledged_message_ids.insert(id) {
509 self.rpc
510 .send(proto::AckChannelMessage {
511 channel_id: self.channel_id.0,
512 message_id: id,
513 })
514 .ok();
515 }
516 }
517
518 pub fn messages_in_range(&self, range: Range<usize>) -> impl Iterator<Item = &ChannelMessage> {
519 let mut cursor = self.messages.cursor::<Count>(&());
520 cursor.seek(&Count(range.start), Bias::Right);
521 cursor.take(range.len())
522 }
523
524 pub fn pending_messages(&self) -> impl Iterator<Item = &ChannelMessage> {
525 let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
526 cursor.seek(&ChannelMessageId::Pending(0), Bias::Left);
527 cursor
528 }
529
530 async fn handle_message_sent(
531 this: Entity<Self>,
532 message: TypedEnvelope<proto::ChannelMessageSent>,
533 mut cx: AsyncApp,
534 ) -> Result<()> {
535 let user_store = this.read_with(&cx, |this, _| this.user_store.clone())?;
536 let message = message.payload.message.context("empty message")?;
537 let message_id = message.id;
538
539 let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
540 this.update(&mut cx, |this, cx| {
541 this.insert_messages(SumTree::from_item(message, &()), cx);
542 cx.emit(ChannelChatEvent::NewMessage {
543 channel_id: this.channel_id,
544 message_id,
545 })
546 })?;
547
548 Ok(())
549 }
550
551 async fn handle_message_removed(
552 this: Entity<Self>,
553 message: TypedEnvelope<proto::RemoveChannelMessage>,
554 mut cx: AsyncApp,
555 ) -> Result<()> {
556 this.update(&mut cx, |this, cx| {
557 this.message_removed(message.payload.message_id, cx)
558 })?;
559 Ok(())
560 }
561
562 async fn handle_message_updated(
563 this: Entity<Self>,
564 message: TypedEnvelope<proto::ChannelMessageUpdate>,
565 mut cx: AsyncApp,
566 ) -> Result<()> {
567 let user_store = this.read_with(&cx, |this, _| this.user_store.clone())?;
568 let message = message.payload.message.context("empty message")?;
569
570 let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
571
572 this.update(&mut cx, |this, cx| {
573 this.message_update(
574 message.id,
575 message.body,
576 message.mentions,
577 message.edited_at,
578 cx,
579 )
580 })?;
581 Ok(())
582 }
583
584 fn insert_messages(&mut self, messages: SumTree<ChannelMessage>, cx: &mut Context<Self>) {
585 if let Some((first_message, last_message)) = messages.first().zip(messages.last()) {
586 let nonces = messages
587 .cursor::<()>(&())
588 .map(|m| m.nonce)
589 .collect::<HashSet<_>>();
590
591 let mut old_cursor = self
592 .messages
593 .cursor::<Dimensions<ChannelMessageId, Count>>(&());
594 let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left);
595 let start_ix = old_cursor.start().1.0;
596 let removed_messages = old_cursor.slice(&last_message.id, Bias::Right);
597 let removed_count = removed_messages.summary().count;
598 let new_count = messages.summary().count;
599 let end_ix = start_ix + removed_count;
600
601 new_messages.append(messages, &());
602
603 let mut ranges = Vec::<Range<usize>>::new();
604 if new_messages.last().unwrap().is_pending() {
605 new_messages.append(old_cursor.suffix(), &());
606 } else {
607 new_messages.append(
608 old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left),
609 &(),
610 );
611
612 while let Some(message) = old_cursor.item() {
613 let message_ix = old_cursor.start().1.0;
614 if nonces.contains(&message.nonce) {
615 if ranges.last().is_some_and(|r| r.end == message_ix) {
616 ranges.last_mut().unwrap().end += 1;
617 } else {
618 ranges.push(message_ix..message_ix + 1);
619 }
620 } else {
621 new_messages.push(message.clone(), &());
622 }
623 old_cursor.next();
624 }
625 }
626
627 drop(old_cursor);
628 self.messages = new_messages;
629
630 for range in ranges.into_iter().rev() {
631 cx.emit(ChannelChatEvent::MessagesUpdated {
632 old_range: range,
633 new_count: 0,
634 });
635 }
636 cx.emit(ChannelChatEvent::MessagesUpdated {
637 old_range: start_ix..end_ix,
638 new_count,
639 });
640
641 cx.notify();
642 }
643 }
644
645 fn message_removed(&mut self, id: u64, cx: &mut Context<Self>) {
646 let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
647 let mut messages = cursor.slice(&ChannelMessageId::Saved(id), Bias::Left);
648 if let Some(item) = cursor.item()
649 && item.id == ChannelMessageId::Saved(id)
650 {
651 let deleted_message_ix = messages.summary().count;
652 cursor.next();
653 messages.append(cursor.suffix(), &());
654 drop(cursor);
655 self.messages = messages;
656
657 // If the message that was deleted was the last acknowledged message,
658 // replace the acknowledged message with an earlier one.
659 self.channel_store.update(cx, |store, _| {
660 let summary = self.messages.summary();
661 if summary.count == 0 {
662 store.set_acknowledged_message_id(self.channel_id, None);
663 } else if deleted_message_ix == summary.count
664 && let ChannelMessageId::Saved(id) = summary.max_id
665 {
666 store.set_acknowledged_message_id(self.channel_id, Some(id));
667 }
668 });
669
670 cx.emit(ChannelChatEvent::MessagesUpdated {
671 old_range: deleted_message_ix..deleted_message_ix + 1,
672 new_count: 0,
673 });
674 }
675 }
676
677 fn message_update(
678 &mut self,
679 id: ChannelMessageId,
680 body: String,
681 mentions: Vec<(Range<usize>, u64)>,
682 edited_at: Option<OffsetDateTime>,
683 cx: &mut Context<Self>,
684 ) {
685 let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
686 let mut messages = cursor.slice(&id, Bias::Left);
687 let ix = messages.summary().count;
688
689 if let Some(mut message_to_update) = cursor.item().cloned() {
690 message_to_update.body = body;
691 message_to_update.mentions = mentions;
692 message_to_update.edited_at = edited_at;
693 messages.push(message_to_update, &());
694 cursor.next();
695 }
696
697 messages.append(cursor.suffix(), &());
698 drop(cursor);
699 self.messages = messages;
700
701 cx.emit(ChannelChatEvent::UpdateMessage {
702 message_ix: ix,
703 message_id: id,
704 });
705
706 cx.notify();
707 }
708}
709
710async fn messages_from_proto(
711 proto_messages: Vec<proto::ChannelMessage>,
712 user_store: &Entity<UserStore>,
713 cx: &mut AsyncApp,
714) -> Result<SumTree<ChannelMessage>> {
715 let messages = ChannelMessage::from_proto_vec(proto_messages, user_store, cx).await?;
716 let mut result = SumTree::default();
717 result.extend(messages, &());
718 Ok(result)
719}
720
721impl ChannelMessage {
722 pub async fn from_proto(
723 message: proto::ChannelMessage,
724 user_store: &Entity<UserStore>,
725 cx: &mut AsyncApp,
726 ) -> Result<Self> {
727 let sender = user_store
728 .update(cx, |user_store, cx| {
729 user_store.get_user(message.sender_id, cx)
730 })?
731 .await?;
732
733 let edited_at = message.edited_at.and_then(|t| -> Option<OffsetDateTime> {
734 if let Ok(a) = OffsetDateTime::from_unix_timestamp(t as i64) {
735 return Some(a);
736 }
737
738 None
739 });
740
741 Ok(ChannelMessage {
742 id: ChannelMessageId::Saved(message.id),
743 body: message.body,
744 mentions: message
745 .mentions
746 .into_iter()
747 .filter_map(|mention| {
748 let range = mention.range?;
749 Some((range.start as usize..range.end as usize, mention.user_id))
750 })
751 .collect(),
752 timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?,
753 sender,
754 nonce: message.nonce.context("nonce is required")?.into(),
755 reply_to_message_id: message.reply_to_message_id,
756 edited_at,
757 })
758 }
759
760 pub fn is_pending(&self) -> bool {
761 matches!(self.id, ChannelMessageId::Pending(_))
762 }
763
764 pub async fn from_proto_vec(
765 proto_messages: Vec<proto::ChannelMessage>,
766 user_store: &Entity<UserStore>,
767 cx: &mut AsyncApp,
768 ) -> Result<Vec<Self>> {
769 let unique_user_ids = proto_messages
770 .iter()
771 .map(|m| m.sender_id)
772 .collect::<HashSet<_>>()
773 .into_iter()
774 .collect();
775 user_store
776 .update(cx, |user_store, cx| {
777 user_store.get_users(unique_user_ids, cx)
778 })?
779 .await?;
780
781 let mut messages = Vec::with_capacity(proto_messages.len());
782 for message in proto_messages {
783 messages.push(ChannelMessage::from_proto(message, user_store, cx).await?);
784 }
785 Ok(messages)
786 }
787}
788
789pub fn mentions_to_proto(mentions: &[(Range<usize>, UserId)]) -> Vec<proto::ChatMention> {
790 mentions
791 .iter()
792 .map(|(range, user_id)| proto::ChatMention {
793 range: Some(proto::Range {
794 start: range.start as u64,
795 end: range.end as u64,
796 }),
797 user_id: *user_id,
798 })
799 .collect()
800}
801
802impl sum_tree::Item for ChannelMessage {
803 type Summary = ChannelMessageSummary;
804
805 fn summary(&self, _cx: &()) -> Self::Summary {
806 ChannelMessageSummary {
807 max_id: self.id,
808 count: 1,
809 }
810 }
811}
812
813impl Default for ChannelMessageId {
814 fn default() -> Self {
815 Self::Saved(0)
816 }
817}
818
819impl sum_tree::Summary for ChannelMessageSummary {
820 type Context = ();
821
822 fn zero(_cx: &Self::Context) -> Self {
823 Default::default()
824 }
825
826 fn add_summary(&mut self, summary: &Self, _: &()) {
827 self.max_id = summary.max_id;
828 self.count += summary.count;
829 }
830}
831
832impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId {
833 fn zero(_cx: &()) -> Self {
834 Default::default()
835 }
836
837 fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
838 debug_assert!(summary.max_id > *self);
839 *self = summary.max_id;
840 }
841}
842
843impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count {
844 fn zero(_cx: &()) -> Self {
845 Default::default()
846 }
847
848 fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
849 self.0 += summary.count;
850 }
851}
852
853impl<'a> From<&'a str> for MessageParams {
854 fn from(value: &'a str) -> Self {
855 Self {
856 text: value.into(),
857 mentions: Vec::new(),
858 reply_to_message_id: None,
859 }
860 }
861}