1use crate::{Channel, ChannelStore};
2use anyhow::{anyhow, Result};
3use client::{
4 proto,
5 user::{User, UserStore},
6 ChannelId, Client, Subscription, TypedEnvelope, UserId,
7};
8use collections::HashSet;
9use futures::lock::Mutex;
10use gpui::{
11 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
12};
13use rand::prelude::*;
14use std::{
15 ops::{ControlFlow, Range},
16 sync::Arc,
17};
18use sum_tree::{Bias, SumTree};
19use time::OffsetDateTime;
20use util::{post_inc, ResultExt as _, TryFutureExt};
21
22pub struct ChannelChat {
23 pub channel_id: ChannelId,
24 messages: SumTree<ChannelMessage>,
25 acknowledged_message_ids: HashSet<u64>,
26 channel_store: Model<ChannelStore>,
27 loaded_all_messages: bool,
28 last_acknowledged_id: Option<u64>,
29 next_pending_message_id: usize,
30 first_loaded_message_id: Option<u64>,
31 user_store: Model<UserStore>,
32 rpc: Arc<Client>,
33 outgoing_messages_lock: Arc<Mutex<()>>,
34 rng: StdRng,
35 _subscription: Subscription,
36}
37
38#[derive(Debug, PartialEq, Eq)]
39pub struct MessageParams {
40 pub text: String,
41 pub mentions: Vec<(Range<usize>, UserId)>,
42 pub reply_to_message_id: Option<u64>,
43}
44
45#[derive(Clone, Debug)]
46pub struct ChannelMessage {
47 pub id: ChannelMessageId,
48 pub body: String,
49 pub timestamp: OffsetDateTime,
50 pub sender: Arc<User>,
51 pub nonce: u128,
52 pub mentions: Vec<(Range<usize>, UserId)>,
53 pub reply_to_message_id: Option<u64>,
54 pub edited_at: Option<OffsetDateTime>,
55}
56
57#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
58pub enum ChannelMessageId {
59 Saved(u64),
60 Pending(usize),
61}
62
63impl Into<Option<u64>> for ChannelMessageId {
64 fn into(self) -> Option<u64> {
65 match self {
66 ChannelMessageId::Saved(id) => Some(id),
67 ChannelMessageId::Pending(_) => None,
68 }
69 }
70}
71
72#[derive(Clone, Debug, Default)]
73pub struct ChannelMessageSummary {
74 max_id: ChannelMessageId,
75 count: usize,
76}
77
78#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
79struct Count(usize);
80
81#[derive(Clone, Debug, PartialEq)]
82pub enum ChannelChatEvent {
83 MessagesUpdated {
84 old_range: Range<usize>,
85 new_count: usize,
86 },
87 UpdateMessage {
88 message_id: ChannelMessageId,
89 message_ix: usize,
90 },
91 NewMessage {
92 channel_id: ChannelId,
93 message_id: u64,
94 },
95}
96
97impl EventEmitter<ChannelChatEvent> for ChannelChat {}
98pub fn init(client: &Arc<Client>) {
99 client.add_model_message_handler(ChannelChat::handle_message_sent);
100 client.add_model_message_handler(ChannelChat::handle_message_removed);
101 client.add_model_message_handler(ChannelChat::handle_message_updated);
102}
103
104impl ChannelChat {
105 pub async fn new(
106 channel: Arc<Channel>,
107 channel_store: Model<ChannelStore>,
108 user_store: Model<UserStore>,
109 client: Arc<Client>,
110 mut cx: AsyncAppContext,
111 ) -> Result<Model<Self>> {
112 let channel_id = channel.id;
113 let subscription = client.subscribe_to_entity(channel_id.0).unwrap();
114
115 let response = client
116 .request(proto::JoinChannelChat {
117 channel_id: channel_id.0,
118 })
119 .await?;
120
121 let handle = cx.new_model(|cx| {
122 cx.on_release(Self::release).detach();
123 Self {
124 channel_id: channel.id,
125 user_store: user_store.clone(),
126 channel_store,
127 rpc: client.clone(),
128 outgoing_messages_lock: Default::default(),
129 messages: Default::default(),
130 acknowledged_message_ids: Default::default(),
131 loaded_all_messages: false,
132 next_pending_message_id: 0,
133 last_acknowledged_id: None,
134 rng: StdRng::from_entropy(),
135 first_loaded_message_id: None,
136 _subscription: subscription.set_model(&cx.handle(), &mut cx.to_async()),
137 }
138 })?;
139 Self::handle_loaded_messages(
140 handle.downgrade(),
141 user_store,
142 client,
143 response.messages,
144 response.done,
145 &mut cx,
146 )
147 .await?;
148 Ok(handle)
149 }
150
151 fn release(&mut self, _: &mut AppContext) {
152 self.rpc
153 .send(proto::LeaveChannelChat {
154 channel_id: self.channel_id.0,
155 })
156 .log_err();
157 }
158
159 pub fn channel(&self, cx: &AppContext) -> Option<Arc<Channel>> {
160 self.channel_store
161 .read(cx)
162 .channel_for_id(self.channel_id)
163 .cloned()
164 }
165
166 pub fn client(&self) -> &Arc<Client> {
167 &self.rpc
168 }
169
170 pub fn send_message(
171 &mut self,
172 message: MessageParams,
173 cx: &mut ModelContext<Self>,
174 ) -> Result<Task<Result<u64>>> {
175 if message.text.trim().is_empty() {
176 Err(anyhow!("message body can't be empty"))?;
177 }
178
179 let current_user = self
180 .user_store
181 .read(cx)
182 .current_user()
183 .ok_or_else(|| anyhow!("current_user is not present"))?;
184
185 let channel_id = self.channel_id;
186 let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id));
187 let nonce = self.rng.gen();
188 self.insert_messages(
189 SumTree::from_item(
190 ChannelMessage {
191 id: pending_id,
192 body: message.text.clone(),
193 sender: current_user,
194 timestamp: OffsetDateTime::now_utc(),
195 mentions: message.mentions.clone(),
196 nonce,
197 reply_to_message_id: message.reply_to_message_id,
198 edited_at: None,
199 },
200 &(),
201 ),
202 cx,
203 );
204 let user_store = self.user_store.clone();
205 let rpc = self.rpc.clone();
206 let outgoing_messages_lock = self.outgoing_messages_lock.clone();
207
208 // todo - handle messages that fail to send (e.g. >1024 chars)
209 Ok(cx.spawn(move |this, mut cx| async move {
210 let outgoing_message_guard = outgoing_messages_lock.lock().await;
211 let request = rpc.request(proto::SendChannelMessage {
212 channel_id: channel_id.0,
213 body: message.text,
214 nonce: Some(nonce.into()),
215 mentions: mentions_to_proto(&message.mentions),
216 reply_to_message_id: message.reply_to_message_id,
217 });
218 let response = request.await?;
219 drop(outgoing_message_guard);
220 let response = response.message.ok_or_else(|| anyhow!("invalid message"))?;
221 let id = response.id;
222 let message = ChannelMessage::from_proto(response, &user_store, &mut cx).await?;
223 this.update(&mut cx, |this, cx| {
224 this.insert_messages(SumTree::from_item(message, &()), cx);
225 if this.first_loaded_message_id.is_none() {
226 this.first_loaded_message_id = Some(id);
227 }
228 })?;
229 Ok(id)
230 }))
231 }
232
233 pub fn remove_message(&mut self, id: u64, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
234 let response = self.rpc.request(proto::RemoveChannelMessage {
235 channel_id: self.channel_id.0,
236 message_id: id,
237 });
238 cx.spawn(move |this, mut cx| async move {
239 response.await?;
240 this.update(&mut cx, |this, cx| {
241 this.message_removed(id, cx);
242 })?;
243 Ok(())
244 })
245 }
246
247 pub fn update_message(
248 &mut self,
249 id: u64,
250 message: MessageParams,
251 cx: &mut ModelContext<Self>,
252 ) -> Result<Task<Result<()>>> {
253 self.message_update(
254 ChannelMessageId::Saved(id),
255 message.text.clone(),
256 message.mentions.clone(),
257 Some(OffsetDateTime::now_utc()),
258 cx,
259 );
260
261 let nonce: u128 = self.rng.gen();
262
263 let request = self.rpc.request(proto::UpdateChannelMessage {
264 channel_id: self.channel_id.0,
265 message_id: id,
266 body: message.text,
267 nonce: Some(nonce.into()),
268 mentions: mentions_to_proto(&message.mentions),
269 });
270 Ok(cx.spawn(move |_, _| async move {
271 request.await?;
272 Ok(())
273 }))
274 }
275
276 pub fn load_more_messages(&mut self, cx: &mut ModelContext<Self>) -> Option<Task<Option<()>>> {
277 if self.loaded_all_messages {
278 return None;
279 }
280
281 let rpc = self.rpc.clone();
282 let user_store = self.user_store.clone();
283 let channel_id = self.channel_id;
284 let before_message_id = self.first_loaded_message_id()?;
285 Some(cx.spawn(move |this, mut cx| {
286 async move {
287 let response = rpc
288 .request(proto::GetChannelMessages {
289 channel_id: channel_id.0,
290 before_message_id,
291 })
292 .await?;
293 Self::handle_loaded_messages(
294 this,
295 user_store,
296 rpc,
297 response.messages,
298 response.done,
299 &mut cx,
300 )
301 .await?;
302
303 anyhow::Ok(())
304 }
305 .log_err()
306 }))
307 }
308
309 pub fn first_loaded_message_id(&mut self) -> Option<u64> {
310 self.first_loaded_message_id
311 }
312
313 /// Load a message by its id, if it's already stored locally.
314 pub fn find_loaded_message(&self, id: u64) -> Option<&ChannelMessage> {
315 self.messages.iter().find(|message| match message.id {
316 ChannelMessageId::Saved(message_id) => message_id == id,
317 ChannelMessageId::Pending(_) => false,
318 })
319 }
320
321 /// Load all of the chat messages since a certain message id.
322 ///
323 /// For now, we always maintain a suffix of the channel's messages.
324 pub async fn load_history_since_message(
325 chat: Model<Self>,
326 message_id: u64,
327 mut cx: AsyncAppContext,
328 ) -> Option<usize> {
329 loop {
330 let step = chat
331 .update(&mut cx, |chat, cx| {
332 if let Some(first_id) = chat.first_loaded_message_id() {
333 if first_id <= message_id {
334 let mut cursor = chat.messages.cursor::<(ChannelMessageId, Count)>();
335 let message_id = ChannelMessageId::Saved(message_id);
336 cursor.seek(&message_id, Bias::Left, &());
337 return ControlFlow::Break(
338 if cursor
339 .item()
340 .map_or(false, |message| message.id == message_id)
341 {
342 Some(cursor.start().1 .0)
343 } else {
344 None
345 },
346 );
347 }
348 }
349 ControlFlow::Continue(chat.load_more_messages(cx))
350 })
351 .log_err()?;
352 match step {
353 ControlFlow::Break(ix) => return ix,
354 ControlFlow::Continue(task) => task?.await?,
355 }
356 }
357 }
358
359 pub fn acknowledge_last_message(&mut self, cx: &mut ModelContext<Self>) {
360 if let ChannelMessageId::Saved(latest_message_id) = self.messages.summary().max_id {
361 if self
362 .last_acknowledged_id
363 .map_or(true, |acknowledged_id| acknowledged_id < latest_message_id)
364 {
365 self.rpc
366 .send(proto::AckChannelMessage {
367 channel_id: self.channel_id.0,
368 message_id: latest_message_id,
369 })
370 .ok();
371 self.last_acknowledged_id = Some(latest_message_id);
372 self.channel_store.update(cx, |store, cx| {
373 store.acknowledge_message_id(self.channel_id, latest_message_id, cx);
374 });
375 }
376 }
377 }
378
379 async fn handle_loaded_messages(
380 this: WeakModel<Self>,
381 user_store: Model<UserStore>,
382 rpc: Arc<Client>,
383 proto_messages: Vec<proto::ChannelMessage>,
384 loaded_all_messages: bool,
385 cx: &mut AsyncAppContext,
386 ) -> Result<()> {
387 let loaded_messages = messages_from_proto(proto_messages, &user_store, cx).await?;
388
389 let first_loaded_message_id = loaded_messages.first().map(|m| m.id);
390 let loaded_message_ids = this.update(cx, |this, _| {
391 let mut loaded_message_ids: HashSet<u64> = HashSet::default();
392 for message in loaded_messages.iter() {
393 if let Some(saved_message_id) = message.id.into() {
394 loaded_message_ids.insert(saved_message_id);
395 }
396 }
397 for message in this.messages.iter() {
398 if let Some(saved_message_id) = message.id.into() {
399 loaded_message_ids.insert(saved_message_id);
400 }
401 }
402 loaded_message_ids
403 })?;
404
405 let missing_ancestors = loaded_messages
406 .iter()
407 .filter_map(|message| {
408 if let Some(ancestor_id) = message.reply_to_message_id {
409 if !loaded_message_ids.contains(&ancestor_id) {
410 return Some(ancestor_id);
411 }
412 }
413 None
414 })
415 .collect::<Vec<_>>();
416
417 let loaded_ancestors = if missing_ancestors.is_empty() {
418 None
419 } else {
420 let response = rpc
421 .request(proto::GetChannelMessagesById {
422 message_ids: missing_ancestors,
423 })
424 .await?;
425 Some(messages_from_proto(response.messages, &user_store, cx).await?)
426 };
427 this.update(cx, |this, cx| {
428 this.first_loaded_message_id = first_loaded_message_id.and_then(|msg_id| msg_id.into());
429 this.loaded_all_messages = loaded_all_messages;
430 this.insert_messages(loaded_messages, cx);
431 if let Some(loaded_ancestors) = loaded_ancestors {
432 this.insert_messages(loaded_ancestors, cx);
433 }
434 })?;
435
436 Ok(())
437 }
438
439 pub fn rejoin(&mut self, cx: &mut ModelContext<Self>) {
440 let user_store = self.user_store.clone();
441 let rpc = self.rpc.clone();
442 let channel_id = self.channel_id;
443 cx.spawn(move |this, mut cx| {
444 async move {
445 let response = rpc
446 .request(proto::JoinChannelChat {
447 channel_id: channel_id.0,
448 })
449 .await?;
450 Self::handle_loaded_messages(
451 this.clone(),
452 user_store.clone(),
453 rpc.clone(),
454 response.messages,
455 response.done,
456 &mut cx,
457 )
458 .await?;
459
460 let pending_messages = this.update(&mut cx, |this, _| {
461 this.pending_messages().cloned().collect::<Vec<_>>()
462 })?;
463
464 for pending_message in pending_messages {
465 let request = rpc.request(proto::SendChannelMessage {
466 channel_id: channel_id.0,
467 body: pending_message.body,
468 mentions: mentions_to_proto(&pending_message.mentions),
469 nonce: Some(pending_message.nonce.into()),
470 reply_to_message_id: pending_message.reply_to_message_id,
471 });
472 let response = request.await?;
473 let message = ChannelMessage::from_proto(
474 response.message.ok_or_else(|| anyhow!("invalid message"))?,
475 &user_store,
476 &mut cx,
477 )
478 .await?;
479 this.update(&mut cx, |this, cx| {
480 this.insert_messages(SumTree::from_item(message, &()), cx);
481 })?;
482 }
483
484 anyhow::Ok(())
485 }
486 .log_err()
487 })
488 .detach();
489 }
490
491 pub fn message_count(&self) -> usize {
492 self.messages.summary().count
493 }
494
495 pub fn messages(&self) -> &SumTree<ChannelMessage> {
496 &self.messages
497 }
498
499 pub fn message(&self, ix: usize) -> &ChannelMessage {
500 let mut cursor = self.messages.cursor::<Count>();
501 cursor.seek(&Count(ix), Bias::Right, &());
502 cursor.item().unwrap()
503 }
504
505 pub fn acknowledge_message(&mut self, id: u64) {
506 if self.acknowledged_message_ids.insert(id) {
507 self.rpc
508 .send(proto::AckChannelMessage {
509 channel_id: self.channel_id.0,
510 message_id: id,
511 })
512 .ok();
513 }
514 }
515
516 pub fn messages_in_range(&self, range: Range<usize>) -> impl Iterator<Item = &ChannelMessage> {
517 let mut cursor = self.messages.cursor::<Count>();
518 cursor.seek(&Count(range.start), Bias::Right, &());
519 cursor.take(range.len())
520 }
521
522 pub fn pending_messages(&self) -> impl Iterator<Item = &ChannelMessage> {
523 let mut cursor = self.messages.cursor::<ChannelMessageId>();
524 cursor.seek(&ChannelMessageId::Pending(0), Bias::Left, &());
525 cursor
526 }
527
528 async fn handle_message_sent(
529 this: Model<Self>,
530 message: TypedEnvelope<proto::ChannelMessageSent>,
531 mut cx: AsyncAppContext,
532 ) -> Result<()> {
533 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
534 let message = message
535 .payload
536 .message
537 .ok_or_else(|| anyhow!("empty message"))?;
538 let message_id = message.id;
539
540 let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
541 this.update(&mut cx, |this, cx| {
542 this.insert_messages(SumTree::from_item(message, &()), cx);
543 cx.emit(ChannelChatEvent::NewMessage {
544 channel_id: this.channel_id,
545 message_id,
546 })
547 })?;
548
549 Ok(())
550 }
551
552 async fn handle_message_removed(
553 this: Model<Self>,
554 message: TypedEnvelope<proto::RemoveChannelMessage>,
555 mut cx: AsyncAppContext,
556 ) -> Result<()> {
557 this.update(&mut cx, |this, cx| {
558 this.message_removed(message.payload.message_id, cx)
559 })?;
560 Ok(())
561 }
562
563 async fn handle_message_updated(
564 this: Model<Self>,
565 message: TypedEnvelope<proto::ChannelMessageUpdate>,
566 mut cx: AsyncAppContext,
567 ) -> Result<()> {
568 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
569 let message = message
570 .payload
571 .message
572 .ok_or_else(|| anyhow!("empty message"))?;
573
574 let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
575
576 this.update(&mut cx, |this, cx| {
577 this.message_update(
578 message.id,
579 message.body,
580 message.mentions,
581 message.edited_at,
582 cx,
583 )
584 })?;
585 Ok(())
586 }
587
588 fn insert_messages(&mut self, messages: SumTree<ChannelMessage>, cx: &mut ModelContext<Self>) {
589 if let Some((first_message, last_message)) = messages.first().zip(messages.last()) {
590 let nonces = messages
591 .cursor::<()>()
592 .map(|m| m.nonce)
593 .collect::<HashSet<_>>();
594
595 let mut old_cursor = self.messages.cursor::<(ChannelMessageId, Count)>();
596 let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left, &());
597 let start_ix = old_cursor.start().1 .0;
598 let removed_messages = old_cursor.slice(&last_message.id, Bias::Right, &());
599 let removed_count = removed_messages.summary().count;
600 let new_count = messages.summary().count;
601 let end_ix = start_ix + removed_count;
602
603 new_messages.append(messages, &());
604
605 let mut ranges = Vec::<Range<usize>>::new();
606 if new_messages.last().unwrap().is_pending() {
607 new_messages.append(old_cursor.suffix(&()), &());
608 } else {
609 new_messages.append(
610 old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left, &()),
611 &(),
612 );
613
614 while let Some(message) = old_cursor.item() {
615 let message_ix = old_cursor.start().1 .0;
616 if nonces.contains(&message.nonce) {
617 if ranges.last().map_or(false, |r| r.end == message_ix) {
618 ranges.last_mut().unwrap().end += 1;
619 } else {
620 ranges.push(message_ix..message_ix + 1);
621 }
622 } else {
623 new_messages.push(message.clone(), &());
624 }
625 old_cursor.next(&());
626 }
627 }
628
629 drop(old_cursor);
630 self.messages = new_messages;
631
632 for range in ranges.into_iter().rev() {
633 cx.emit(ChannelChatEvent::MessagesUpdated {
634 old_range: range,
635 new_count: 0,
636 });
637 }
638 cx.emit(ChannelChatEvent::MessagesUpdated {
639 old_range: start_ix..end_ix,
640 new_count,
641 });
642
643 cx.notify();
644 }
645 }
646
647 fn message_removed(&mut self, id: u64, cx: &mut ModelContext<Self>) {
648 let mut cursor = self.messages.cursor::<ChannelMessageId>();
649 let mut messages = cursor.slice(&ChannelMessageId::Saved(id), Bias::Left, &());
650 if let Some(item) = cursor.item() {
651 if item.id == ChannelMessageId::Saved(id) {
652 let deleted_message_ix = messages.summary().count;
653 cursor.next(&());
654 messages.append(cursor.suffix(&()), &());
655 drop(cursor);
656 self.messages = messages;
657
658 // If the message that was deleted was the last acknowledged message,
659 // replace the acknowledged message with an earlier one.
660 self.channel_store.update(cx, |store, _| {
661 let summary = self.messages.summary();
662 if summary.count == 0 {
663 store.set_acknowledged_message_id(self.channel_id, None);
664 } else if deleted_message_ix == summary.count {
665 if let ChannelMessageId::Saved(id) = summary.max_id {
666 store.set_acknowledged_message_id(self.channel_id, Some(id));
667 }
668 }
669 });
670
671 cx.emit(ChannelChatEvent::MessagesUpdated {
672 old_range: deleted_message_ix..deleted_message_ix + 1,
673 new_count: 0,
674 });
675 }
676 }
677 }
678
679 fn message_update(
680 &mut self,
681 id: ChannelMessageId,
682 body: String,
683 mentions: Vec<(Range<usize>, u64)>,
684 edited_at: Option<OffsetDateTime>,
685 cx: &mut ModelContext<Self>,
686 ) {
687 let mut cursor = self.messages.cursor::<ChannelMessageId>();
688 let mut messages = cursor.slice(&id, Bias::Left, &());
689 let ix = messages.summary().count;
690
691 if let Some(mut message_to_update) = cursor.item().cloned() {
692 message_to_update.body = body;
693 message_to_update.mentions = mentions;
694 message_to_update.edited_at = edited_at;
695 messages.push(message_to_update, &());
696 cursor.next(&());
697 }
698
699 messages.append(cursor.suffix(&()), &());
700 drop(cursor);
701 self.messages = messages;
702
703 cx.emit(ChannelChatEvent::UpdateMessage {
704 message_ix: ix,
705 message_id: id,
706 });
707
708 cx.notify();
709 }
710}
711
712async fn messages_from_proto(
713 proto_messages: Vec<proto::ChannelMessage>,
714 user_store: &Model<UserStore>,
715 cx: &mut AsyncAppContext,
716) -> Result<SumTree<ChannelMessage>> {
717 let messages = ChannelMessage::from_proto_vec(proto_messages, user_store, cx).await?;
718 let mut result = SumTree::new();
719 result.extend(messages, &());
720 Ok(result)
721}
722
723impl ChannelMessage {
724 pub async fn from_proto(
725 message: proto::ChannelMessage,
726 user_store: &Model<UserStore>,
727 cx: &mut AsyncAppContext,
728 ) -> Result<Self> {
729 let sender = user_store
730 .update(cx, |user_store, cx| {
731 user_store.get_user(message.sender_id, cx)
732 })?
733 .await?;
734
735 let edited_at = message.edited_at.and_then(|t| -> Option<OffsetDateTime> {
736 if let Ok(a) = OffsetDateTime::from_unix_timestamp(t as i64) {
737 return Some(a);
738 }
739
740 None
741 });
742
743 Ok(ChannelMessage {
744 id: ChannelMessageId::Saved(message.id),
745 body: message.body,
746 mentions: message
747 .mentions
748 .into_iter()
749 .filter_map(|mention| {
750 let range = mention.range?;
751 Some((range.start as usize..range.end as usize, mention.user_id))
752 })
753 .collect(),
754 timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?,
755 sender,
756 nonce: message
757 .nonce
758 .ok_or_else(|| anyhow!("nonce is required"))?
759 .into(),
760 reply_to_message_id: message.reply_to_message_id,
761 edited_at,
762 })
763 }
764
765 pub fn is_pending(&self) -> bool {
766 matches!(self.id, ChannelMessageId::Pending(_))
767 }
768
769 pub async fn from_proto_vec(
770 proto_messages: Vec<proto::ChannelMessage>,
771 user_store: &Model<UserStore>,
772 cx: &mut AsyncAppContext,
773 ) -> Result<Vec<Self>> {
774 let unique_user_ids = proto_messages
775 .iter()
776 .map(|m| m.sender_id)
777 .collect::<HashSet<_>>()
778 .into_iter()
779 .collect();
780 user_store
781 .update(cx, |user_store, cx| {
782 user_store.get_users(unique_user_ids, cx)
783 })?
784 .await?;
785
786 let mut messages = Vec::with_capacity(proto_messages.len());
787 for message in proto_messages {
788 messages.push(ChannelMessage::from_proto(message, user_store, cx).await?);
789 }
790 Ok(messages)
791 }
792}
793
794pub fn mentions_to_proto(mentions: &[(Range<usize>, UserId)]) -> Vec<proto::ChatMention> {
795 mentions
796 .iter()
797 .map(|(range, user_id)| proto::ChatMention {
798 range: Some(proto::Range {
799 start: range.start as u64,
800 end: range.end as u64,
801 }),
802 user_id: *user_id,
803 })
804 .collect()
805}
806
807impl sum_tree::Item for ChannelMessage {
808 type Summary = ChannelMessageSummary;
809
810 fn summary(&self) -> Self::Summary {
811 ChannelMessageSummary {
812 max_id: self.id,
813 count: 1,
814 }
815 }
816}
817
818impl Default for ChannelMessageId {
819 fn default() -> Self {
820 Self::Saved(0)
821 }
822}
823
824impl sum_tree::Summary for ChannelMessageSummary {
825 type Context = ();
826
827 fn add_summary(&mut self, summary: &Self, _: &()) {
828 self.max_id = summary.max_id;
829 self.count += summary.count;
830 }
831}
832
833impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId {
834 fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
835 debug_assert!(summary.max_id > *self);
836 *self = summary.max_id;
837 }
838}
839
840impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count {
841 fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
842 self.0 += summary.count;
843 }
844}
845
846impl<'a> From<&'a str> for MessageParams {
847 fn from(value: &'a str) -> Self {
848 Self {
849 text: value.into(),
850 mentions: Vec::new(),
851 reply_to_message_id: None,
852 }
853 }
854}