channel_chat.rs

  1use crate::{Channel, ChannelStore};
  2use anyhow::{anyhow, Result};
  3use client::{
  4    proto,
  5    user::{User, UserStore},
  6    ChannelId, Client, Subscription, TypedEnvelope, UserId,
  7};
  8use collections::HashSet;
  9use futures::lock::Mutex;
 10use gpui::{
 11    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
 12};
 13use rand::prelude::*;
 14use std::{
 15    ops::{ControlFlow, Range},
 16    sync::Arc,
 17};
 18use sum_tree::{Bias, SumTree};
 19use time::OffsetDateTime;
 20use util::{post_inc, ResultExt as _, TryFutureExt};
 21
 22pub struct ChannelChat {
 23    pub channel_id: ChannelId,
 24    messages: SumTree<ChannelMessage>,
 25    acknowledged_message_ids: HashSet<u64>,
 26    channel_store: Model<ChannelStore>,
 27    loaded_all_messages: bool,
 28    last_acknowledged_id: Option<u64>,
 29    next_pending_message_id: usize,
 30    first_loaded_message_id: Option<u64>,
 31    user_store: Model<UserStore>,
 32    rpc: Arc<Client>,
 33    outgoing_messages_lock: Arc<Mutex<()>>,
 34    rng: StdRng,
 35    _subscription: Subscription,
 36}
 37
 38#[derive(Debug, PartialEq, Eq)]
 39pub struct MessageParams {
 40    pub text: String,
 41    pub mentions: Vec<(Range<usize>, UserId)>,
 42    pub reply_to_message_id: Option<u64>,
 43}
 44
 45#[derive(Clone, Debug)]
 46pub struct ChannelMessage {
 47    pub id: ChannelMessageId,
 48    pub body: String,
 49    pub timestamp: OffsetDateTime,
 50    pub sender: Arc<User>,
 51    pub nonce: u128,
 52    pub mentions: Vec<(Range<usize>, UserId)>,
 53    pub reply_to_message_id: Option<u64>,
 54    pub edited_at: Option<OffsetDateTime>,
 55}
 56
 57#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
 58pub enum ChannelMessageId {
 59    Saved(u64),
 60    Pending(usize),
 61}
 62
 63impl Into<Option<u64>> for ChannelMessageId {
 64    fn into(self) -> Option<u64> {
 65        match self {
 66            ChannelMessageId::Saved(id) => Some(id),
 67            ChannelMessageId::Pending(_) => None,
 68        }
 69    }
 70}
 71
 72#[derive(Clone, Debug, Default)]
 73pub struct ChannelMessageSummary {
 74    max_id: ChannelMessageId,
 75    count: usize,
 76}
 77
 78#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
 79struct Count(usize);
 80
 81#[derive(Clone, Debug, PartialEq)]
 82pub enum ChannelChatEvent {
 83    MessagesUpdated {
 84        old_range: Range<usize>,
 85        new_count: usize,
 86    },
 87    UpdateMessage {
 88        message_id: ChannelMessageId,
 89        message_ix: usize,
 90    },
 91    NewMessage {
 92        channel_id: ChannelId,
 93        message_id: u64,
 94    },
 95}
 96
 97impl EventEmitter<ChannelChatEvent> for ChannelChat {}
 98pub fn init(client: &Arc<Client>) {
 99    client.add_model_message_handler(ChannelChat::handle_message_sent);
100    client.add_model_message_handler(ChannelChat::handle_message_removed);
101    client.add_model_message_handler(ChannelChat::handle_message_updated);
102}
103
104impl ChannelChat {
105    pub async fn new(
106        channel: Arc<Channel>,
107        channel_store: Model<ChannelStore>,
108        user_store: Model<UserStore>,
109        client: Arc<Client>,
110        mut cx: AsyncAppContext,
111    ) -> Result<Model<Self>> {
112        let channel_id = channel.id;
113        let subscription = client.subscribe_to_entity(channel_id.0).unwrap();
114
115        let response = client
116            .request(proto::JoinChannelChat {
117                channel_id: channel_id.0,
118            })
119            .await?;
120
121        let handle = cx.new_model(|cx| {
122            cx.on_release(Self::release).detach();
123            Self {
124                channel_id: channel.id,
125                user_store: user_store.clone(),
126                channel_store,
127                rpc: client.clone(),
128                outgoing_messages_lock: Default::default(),
129                messages: Default::default(),
130                acknowledged_message_ids: Default::default(),
131                loaded_all_messages: false,
132                next_pending_message_id: 0,
133                last_acknowledged_id: None,
134                rng: StdRng::from_entropy(),
135                first_loaded_message_id: None,
136                _subscription: subscription.set_model(&cx.handle(), &mut cx.to_async()),
137            }
138        })?;
139        Self::handle_loaded_messages(
140            handle.downgrade(),
141            user_store,
142            client,
143            response.messages,
144            response.done,
145            &mut cx,
146        )
147        .await?;
148        Ok(handle)
149    }
150
151    fn release(&mut self, _: &mut AppContext) {
152        self.rpc
153            .send(proto::LeaveChannelChat {
154                channel_id: self.channel_id.0,
155            })
156            .log_err();
157    }
158
159    pub fn channel(&self, cx: &AppContext) -> Option<Arc<Channel>> {
160        self.channel_store
161            .read(cx)
162            .channel_for_id(self.channel_id)
163            .cloned()
164    }
165
166    pub fn client(&self) -> &Arc<Client> {
167        &self.rpc
168    }
169
170    pub fn send_message(
171        &mut self,
172        message: MessageParams,
173        cx: &mut ModelContext<Self>,
174    ) -> Result<Task<Result<u64>>> {
175        if message.text.trim().is_empty() {
176            Err(anyhow!("message body can't be empty"))?;
177        }
178
179        let current_user = self
180            .user_store
181            .read(cx)
182            .current_user()
183            .ok_or_else(|| anyhow!("current_user is not present"))?;
184
185        let channel_id = self.channel_id;
186        let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id));
187        let nonce = self.rng.gen();
188        self.insert_messages(
189            SumTree::from_item(
190                ChannelMessage {
191                    id: pending_id,
192                    body: message.text.clone(),
193                    sender: current_user,
194                    timestamp: OffsetDateTime::now_utc(),
195                    mentions: message.mentions.clone(),
196                    nonce,
197                    reply_to_message_id: message.reply_to_message_id,
198                    edited_at: None,
199                },
200                &(),
201            ),
202            cx,
203        );
204        let user_store = self.user_store.clone();
205        let rpc = self.rpc.clone();
206        let outgoing_messages_lock = self.outgoing_messages_lock.clone();
207
208        // todo - handle messages that fail to send (e.g. >1024 chars)
209        Ok(cx.spawn(move |this, mut cx| async move {
210            let outgoing_message_guard = outgoing_messages_lock.lock().await;
211            let request = rpc.request(proto::SendChannelMessage {
212                channel_id: channel_id.0,
213                body: message.text,
214                nonce: Some(nonce.into()),
215                mentions: mentions_to_proto(&message.mentions),
216                reply_to_message_id: message.reply_to_message_id,
217            });
218            let response = request.await?;
219            drop(outgoing_message_guard);
220            let response = response.message.ok_or_else(|| anyhow!("invalid message"))?;
221            let id = response.id;
222            let message = ChannelMessage::from_proto(response, &user_store, &mut cx).await?;
223            this.update(&mut cx, |this, cx| {
224                this.insert_messages(SumTree::from_item(message, &()), cx);
225            })?;
226            Ok(id)
227        }))
228    }
229
230    pub fn remove_message(&mut self, id: u64, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
231        let response = self.rpc.request(proto::RemoveChannelMessage {
232            channel_id: self.channel_id.0,
233            message_id: id,
234        });
235        cx.spawn(move |this, mut cx| async move {
236            response.await?;
237            this.update(&mut cx, |this, cx| {
238                this.message_removed(id, cx);
239            })?;
240            Ok(())
241        })
242    }
243
244    pub fn update_message(
245        &mut self,
246        id: u64,
247        message: MessageParams,
248        cx: &mut ModelContext<Self>,
249    ) -> Result<Task<Result<()>>> {
250        self.message_update(
251            ChannelMessageId::Saved(id),
252            message.text.clone(),
253            message.mentions.clone(),
254            Some(OffsetDateTime::now_utc()),
255            cx,
256        );
257
258        let nonce: u128 = self.rng.gen();
259
260        let request = self.rpc.request(proto::UpdateChannelMessage {
261            channel_id: self.channel_id.0,
262            message_id: id,
263            body: message.text,
264            nonce: Some(nonce.into()),
265            mentions: mentions_to_proto(&message.mentions),
266        });
267        Ok(cx.spawn(move |_, _| async move {
268            request.await?;
269            Ok(())
270        }))
271    }
272
273    pub fn load_more_messages(&mut self, cx: &mut ModelContext<Self>) -> Option<Task<Option<()>>> {
274        if self.loaded_all_messages {
275            return None;
276        }
277
278        let rpc = self.rpc.clone();
279        let user_store = self.user_store.clone();
280        let channel_id = self.channel_id;
281        let before_message_id = self.first_loaded_message_id()?;
282        Some(cx.spawn(move |this, mut cx| {
283            async move {
284                let response = rpc
285                    .request(proto::GetChannelMessages {
286                        channel_id: channel_id.0,
287                        before_message_id,
288                    })
289                    .await?;
290                Self::handle_loaded_messages(
291                    this,
292                    user_store,
293                    rpc,
294                    response.messages,
295                    response.done,
296                    &mut cx,
297                )
298                .await?;
299
300                anyhow::Ok(())
301            }
302            .log_err()
303        }))
304    }
305
306    pub fn first_loaded_message_id(&mut self) -> Option<u64> {
307        self.first_loaded_message_id
308    }
309
310    /// Load a message by its id, if it's already stored locally.
311    pub fn find_loaded_message(&self, id: u64) -> Option<&ChannelMessage> {
312        self.messages.iter().find(|message| match message.id {
313            ChannelMessageId::Saved(message_id) => message_id == id,
314            ChannelMessageId::Pending(_) => false,
315        })
316    }
317
318    /// Load all of the chat messages since a certain message id.
319    ///
320    /// For now, we always maintain a suffix of the channel's messages.
321    pub async fn load_history_since_message(
322        chat: Model<Self>,
323        message_id: u64,
324        mut cx: AsyncAppContext,
325    ) -> Option<usize> {
326        loop {
327            let step = chat
328                .update(&mut cx, |chat, cx| {
329                    if let Some(first_id) = chat.first_loaded_message_id() {
330                        if first_id <= message_id {
331                            let mut cursor = chat.messages.cursor::<(ChannelMessageId, Count)>();
332                            let message_id = ChannelMessageId::Saved(message_id);
333                            cursor.seek(&message_id, Bias::Left, &());
334                            return ControlFlow::Break(
335                                if cursor
336                                    .item()
337                                    .map_or(false, |message| message.id == message_id)
338                                {
339                                    Some(cursor.start().1 .0)
340                                } else {
341                                    None
342                                },
343                            );
344                        }
345                    }
346                    ControlFlow::Continue(chat.load_more_messages(cx))
347                })
348                .log_err()?;
349            match step {
350                ControlFlow::Break(ix) => return ix,
351                ControlFlow::Continue(task) => task?.await?,
352            }
353        }
354    }
355
356    pub fn acknowledge_last_message(&mut self, cx: &mut ModelContext<Self>) {
357        if let ChannelMessageId::Saved(latest_message_id) = self.messages.summary().max_id {
358            if self
359                .last_acknowledged_id
360                .map_or(true, |acknowledged_id| acknowledged_id < latest_message_id)
361            {
362                self.rpc
363                    .send(proto::AckChannelMessage {
364                        channel_id: self.channel_id.0,
365                        message_id: latest_message_id,
366                    })
367                    .ok();
368                self.last_acknowledged_id = Some(latest_message_id);
369                self.channel_store.update(cx, |store, cx| {
370                    store.acknowledge_message_id(self.channel_id, latest_message_id, cx);
371                });
372            }
373        }
374    }
375
376    async fn handle_loaded_messages(
377        this: WeakModel<Self>,
378        user_store: Model<UserStore>,
379        rpc: Arc<Client>,
380        proto_messages: Vec<proto::ChannelMessage>,
381        loaded_all_messages: bool,
382        cx: &mut AsyncAppContext,
383    ) -> Result<()> {
384        let loaded_messages = messages_from_proto(proto_messages, &user_store, cx).await?;
385
386        let first_loaded_message_id = loaded_messages.first().map(|m| m.id);
387        let loaded_message_ids = this.update(cx, |this, _| {
388            let mut loaded_message_ids: HashSet<u64> = HashSet::default();
389            for message in loaded_messages.iter() {
390                if let Some(saved_message_id) = message.id.into() {
391                    loaded_message_ids.insert(saved_message_id);
392                }
393            }
394            for message in this.messages.iter() {
395                if let Some(saved_message_id) = message.id.into() {
396                    loaded_message_ids.insert(saved_message_id);
397                }
398            }
399            loaded_message_ids
400        })?;
401
402        let missing_ancestors = loaded_messages
403            .iter()
404            .filter_map(|message| {
405                if let Some(ancestor_id) = message.reply_to_message_id {
406                    if !loaded_message_ids.contains(&ancestor_id) {
407                        return Some(ancestor_id);
408                    }
409                }
410                None
411            })
412            .collect::<Vec<_>>();
413
414        let loaded_ancestors = if missing_ancestors.is_empty() {
415            None
416        } else {
417            let response = rpc
418                .request(proto::GetChannelMessagesById {
419                    message_ids: missing_ancestors,
420                })
421                .await?;
422            Some(messages_from_proto(response.messages, &user_store, cx).await?)
423        };
424        this.update(cx, |this, cx| {
425            this.first_loaded_message_id = first_loaded_message_id.and_then(|msg_id| msg_id.into());
426            this.loaded_all_messages = loaded_all_messages;
427            this.insert_messages(loaded_messages, cx);
428            if let Some(loaded_ancestors) = loaded_ancestors {
429                this.insert_messages(loaded_ancestors, cx);
430            }
431        })?;
432
433        Ok(())
434    }
435
436    pub fn rejoin(&mut self, cx: &mut ModelContext<Self>) {
437        let user_store = self.user_store.clone();
438        let rpc = self.rpc.clone();
439        let channel_id = self.channel_id;
440        cx.spawn(move |this, mut cx| {
441            async move {
442                let response = rpc
443                    .request(proto::JoinChannelChat {
444                        channel_id: channel_id.0,
445                    })
446                    .await?;
447                Self::handle_loaded_messages(
448                    this.clone(),
449                    user_store.clone(),
450                    rpc.clone(),
451                    response.messages,
452                    response.done,
453                    &mut cx,
454                )
455                .await?;
456
457                let pending_messages = this.update(&mut cx, |this, _| {
458                    this.pending_messages().cloned().collect::<Vec<_>>()
459                })?;
460
461                for pending_message in pending_messages {
462                    let request = rpc.request(proto::SendChannelMessage {
463                        channel_id: channel_id.0,
464                        body: pending_message.body,
465                        mentions: mentions_to_proto(&pending_message.mentions),
466                        nonce: Some(pending_message.nonce.into()),
467                        reply_to_message_id: pending_message.reply_to_message_id,
468                    });
469                    let response = request.await?;
470                    let message = ChannelMessage::from_proto(
471                        response.message.ok_or_else(|| anyhow!("invalid message"))?,
472                        &user_store,
473                        &mut cx,
474                    )
475                    .await?;
476                    this.update(&mut cx, |this, cx| {
477                        this.insert_messages(SumTree::from_item(message, &()), cx);
478                    })?;
479                }
480
481                anyhow::Ok(())
482            }
483            .log_err()
484        })
485        .detach();
486    }
487
488    pub fn message_count(&self) -> usize {
489        self.messages.summary().count
490    }
491
492    pub fn messages(&self) -> &SumTree<ChannelMessage> {
493        &self.messages
494    }
495
496    pub fn message(&self, ix: usize) -> &ChannelMessage {
497        let mut cursor = self.messages.cursor::<Count>();
498        cursor.seek(&Count(ix), Bias::Right, &());
499        cursor.item().unwrap()
500    }
501
502    pub fn acknowledge_message(&mut self, id: u64) {
503        if self.acknowledged_message_ids.insert(id) {
504            self.rpc
505                .send(proto::AckChannelMessage {
506                    channel_id: self.channel_id.0,
507                    message_id: id,
508                })
509                .ok();
510        }
511    }
512
513    pub fn messages_in_range(&self, range: Range<usize>) -> impl Iterator<Item = &ChannelMessage> {
514        let mut cursor = self.messages.cursor::<Count>();
515        cursor.seek(&Count(range.start), Bias::Right, &());
516        cursor.take(range.len())
517    }
518
519    pub fn pending_messages(&self) -> impl Iterator<Item = &ChannelMessage> {
520        let mut cursor = self.messages.cursor::<ChannelMessageId>();
521        cursor.seek(&ChannelMessageId::Pending(0), Bias::Left, &());
522        cursor
523    }
524
525    async fn handle_message_sent(
526        this: Model<Self>,
527        message: TypedEnvelope<proto::ChannelMessageSent>,
528        _: Arc<Client>,
529        mut cx: AsyncAppContext,
530    ) -> Result<()> {
531        let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
532        let message = message
533            .payload
534            .message
535            .ok_or_else(|| anyhow!("empty message"))?;
536        let message_id = message.id;
537
538        let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
539        this.update(&mut cx, |this, cx| {
540            this.insert_messages(SumTree::from_item(message, &()), cx);
541            cx.emit(ChannelChatEvent::NewMessage {
542                channel_id: this.channel_id,
543                message_id,
544            })
545        })?;
546
547        Ok(())
548    }
549
550    async fn handle_message_removed(
551        this: Model<Self>,
552        message: TypedEnvelope<proto::RemoveChannelMessage>,
553        _: Arc<Client>,
554        mut cx: AsyncAppContext,
555    ) -> Result<()> {
556        this.update(&mut cx, |this, cx| {
557            this.message_removed(message.payload.message_id, cx)
558        })?;
559        Ok(())
560    }
561
562    async fn handle_message_updated(
563        this: Model<Self>,
564        message: TypedEnvelope<proto::ChannelMessageUpdate>,
565        _: Arc<Client>,
566        mut cx: AsyncAppContext,
567    ) -> Result<()> {
568        let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
569        let message = message
570            .payload
571            .message
572            .ok_or_else(|| anyhow!("empty message"))?;
573
574        let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
575
576        this.update(&mut cx, |this, cx| {
577            this.message_update(
578                message.id,
579                message.body,
580                message.mentions,
581                message.edited_at,
582                cx,
583            )
584        })?;
585        Ok(())
586    }
587
588    fn insert_messages(&mut self, messages: SumTree<ChannelMessage>, cx: &mut ModelContext<Self>) {
589        if let Some((first_message, last_message)) = messages.first().zip(messages.last()) {
590            let nonces = messages
591                .cursor::<()>()
592                .map(|m| m.nonce)
593                .collect::<HashSet<_>>();
594
595            let mut old_cursor = self.messages.cursor::<(ChannelMessageId, Count)>();
596            let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left, &());
597            let start_ix = old_cursor.start().1 .0;
598            let removed_messages = old_cursor.slice(&last_message.id, Bias::Right, &());
599            let removed_count = removed_messages.summary().count;
600            let new_count = messages.summary().count;
601            let end_ix = start_ix + removed_count;
602
603            new_messages.append(messages, &());
604
605            let mut ranges = Vec::<Range<usize>>::new();
606            if new_messages.last().unwrap().is_pending() {
607                new_messages.append(old_cursor.suffix(&()), &());
608            } else {
609                new_messages.append(
610                    old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left, &()),
611                    &(),
612                );
613
614                while let Some(message) = old_cursor.item() {
615                    let message_ix = old_cursor.start().1 .0;
616                    if nonces.contains(&message.nonce) {
617                        if ranges.last().map_or(false, |r| r.end == message_ix) {
618                            ranges.last_mut().unwrap().end += 1;
619                        } else {
620                            ranges.push(message_ix..message_ix + 1);
621                        }
622                    } else {
623                        new_messages.push(message.clone(), &());
624                    }
625                    old_cursor.next(&());
626                }
627            }
628
629            drop(old_cursor);
630            self.messages = new_messages;
631
632            for range in ranges.into_iter().rev() {
633                cx.emit(ChannelChatEvent::MessagesUpdated {
634                    old_range: range,
635                    new_count: 0,
636                });
637            }
638            cx.emit(ChannelChatEvent::MessagesUpdated {
639                old_range: start_ix..end_ix,
640                new_count,
641            });
642
643            cx.notify();
644        }
645    }
646
647    fn message_removed(&mut self, id: u64, cx: &mut ModelContext<Self>) {
648        let mut cursor = self.messages.cursor::<ChannelMessageId>();
649        let mut messages = cursor.slice(&ChannelMessageId::Saved(id), Bias::Left, &());
650        if let Some(item) = cursor.item() {
651            if item.id == ChannelMessageId::Saved(id) {
652                let ix = messages.summary().count;
653                cursor.next(&());
654                messages.append(cursor.suffix(&()), &());
655                drop(cursor);
656                self.messages = messages;
657                cx.emit(ChannelChatEvent::MessagesUpdated {
658                    old_range: ix..ix + 1,
659                    new_count: 0,
660                });
661            }
662        }
663    }
664
665    fn message_update(
666        &mut self,
667        id: ChannelMessageId,
668        body: String,
669        mentions: Vec<(Range<usize>, u64)>,
670        edited_at: Option<OffsetDateTime>,
671        cx: &mut ModelContext<Self>,
672    ) {
673        let mut cursor = self.messages.cursor::<ChannelMessageId>();
674        let mut messages = cursor.slice(&id, Bias::Left, &());
675        let ix = messages.summary().count;
676
677        if let Some(mut message_to_update) = cursor.item().cloned() {
678            message_to_update.body = body;
679            message_to_update.mentions = mentions;
680            message_to_update.edited_at = edited_at;
681            messages.push(message_to_update, &());
682            cursor.next(&());
683        }
684
685        messages.append(cursor.suffix(&()), &());
686        drop(cursor);
687        self.messages = messages;
688
689        cx.emit(ChannelChatEvent::UpdateMessage {
690            message_ix: ix,
691            message_id: id,
692        });
693
694        cx.notify();
695    }
696}
697
698async fn messages_from_proto(
699    proto_messages: Vec<proto::ChannelMessage>,
700    user_store: &Model<UserStore>,
701    cx: &mut AsyncAppContext,
702) -> Result<SumTree<ChannelMessage>> {
703    let messages = ChannelMessage::from_proto_vec(proto_messages, user_store, cx).await?;
704    let mut result = SumTree::new();
705    result.extend(messages, &());
706    Ok(result)
707}
708
709impl ChannelMessage {
710    pub async fn from_proto(
711        message: proto::ChannelMessage,
712        user_store: &Model<UserStore>,
713        cx: &mut AsyncAppContext,
714    ) -> Result<Self> {
715        let sender = user_store
716            .update(cx, |user_store, cx| {
717                user_store.get_user(message.sender_id, cx)
718            })?
719            .await?;
720
721        let edited_at = message.edited_at.and_then(|t| -> Option<OffsetDateTime> {
722            if let Ok(a) = OffsetDateTime::from_unix_timestamp(t as i64) {
723                return Some(a);
724            }
725
726            None
727        });
728
729        Ok(ChannelMessage {
730            id: ChannelMessageId::Saved(message.id),
731            body: message.body,
732            mentions: message
733                .mentions
734                .into_iter()
735                .filter_map(|mention| {
736                    let range = mention.range?;
737                    Some((range.start as usize..range.end as usize, mention.user_id))
738                })
739                .collect(),
740            timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?,
741            sender,
742            nonce: message
743                .nonce
744                .ok_or_else(|| anyhow!("nonce is required"))?
745                .into(),
746            reply_to_message_id: message.reply_to_message_id,
747            edited_at,
748        })
749    }
750
751    pub fn is_pending(&self) -> bool {
752        matches!(self.id, ChannelMessageId::Pending(_))
753    }
754
755    pub async fn from_proto_vec(
756        proto_messages: Vec<proto::ChannelMessage>,
757        user_store: &Model<UserStore>,
758        cx: &mut AsyncAppContext,
759    ) -> Result<Vec<Self>> {
760        let unique_user_ids = proto_messages
761            .iter()
762            .map(|m| m.sender_id)
763            .collect::<HashSet<_>>()
764            .into_iter()
765            .collect();
766        user_store
767            .update(cx, |user_store, cx| {
768                user_store.get_users(unique_user_ids, cx)
769            })?
770            .await?;
771
772        let mut messages = Vec::with_capacity(proto_messages.len());
773        for message in proto_messages {
774            messages.push(ChannelMessage::from_proto(message, user_store, cx).await?);
775        }
776        Ok(messages)
777    }
778}
779
780pub fn mentions_to_proto(mentions: &[(Range<usize>, UserId)]) -> Vec<proto::ChatMention> {
781    mentions
782        .iter()
783        .map(|(range, user_id)| proto::ChatMention {
784            range: Some(proto::Range {
785                start: range.start as u64,
786                end: range.end as u64,
787            }),
788            user_id: *user_id,
789        })
790        .collect()
791}
792
793impl sum_tree::Item for ChannelMessage {
794    type Summary = ChannelMessageSummary;
795
796    fn summary(&self) -> Self::Summary {
797        ChannelMessageSummary {
798            max_id: self.id,
799            count: 1,
800        }
801    }
802}
803
804impl Default for ChannelMessageId {
805    fn default() -> Self {
806        Self::Saved(0)
807    }
808}
809
810impl sum_tree::Summary for ChannelMessageSummary {
811    type Context = ();
812
813    fn add_summary(&mut self, summary: &Self, _: &()) {
814        self.max_id = summary.max_id;
815        self.count += summary.count;
816    }
817}
818
819impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId {
820    fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
821        debug_assert!(summary.max_id > *self);
822        *self = summary.max_id;
823    }
824}
825
826impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count {
827    fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
828        self.0 += summary.count;
829    }
830}
831
832impl<'a> From<&'a str> for MessageParams {
833    fn from(value: &'a str) -> Self {
834        Self {
835            text: value.into(),
836            mentions: Vec::new(),
837            reply_to_message_id: None,
838        }
839    }
840}