channel_chat.rs

  1use crate::{Channel, ChannelStore};
  2use anyhow::{Result, anyhow};
  3use client::{
  4    ChannelId, Client, Subscription, TypedEnvelope, UserId, proto,
  5    user::{User, UserStore},
  6};
  7use collections::HashSet;
  8use futures::lock::Mutex;
  9use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task, WeakEntity};
 10use rand::prelude::*;
 11use rpc::AnyProtoClient;
 12use std::{
 13    ops::{ControlFlow, Range},
 14    sync::Arc,
 15};
 16use sum_tree::{Bias, SumTree};
 17use time::OffsetDateTime;
 18use util::{ResultExt as _, TryFutureExt, post_inc};
 19
 20pub struct ChannelChat {
 21    pub channel_id: ChannelId,
 22    messages: SumTree<ChannelMessage>,
 23    acknowledged_message_ids: HashSet<u64>,
 24    channel_store: Entity<ChannelStore>,
 25    loaded_all_messages: bool,
 26    last_acknowledged_id: Option<u64>,
 27    next_pending_message_id: usize,
 28    first_loaded_message_id: Option<u64>,
 29    user_store: Entity<UserStore>,
 30    rpc: Arc<Client>,
 31    outgoing_messages_lock: Arc<Mutex<()>>,
 32    rng: StdRng,
 33    _subscription: Subscription,
 34}
 35
 36#[derive(Debug, PartialEq, Eq)]
 37pub struct MessageParams {
 38    pub text: String,
 39    pub mentions: Vec<(Range<usize>, UserId)>,
 40    pub reply_to_message_id: Option<u64>,
 41}
 42
 43#[derive(Clone, Debug)]
 44pub struct ChannelMessage {
 45    pub id: ChannelMessageId,
 46    pub body: String,
 47    pub timestamp: OffsetDateTime,
 48    pub sender: Arc<User>,
 49    pub nonce: u128,
 50    pub mentions: Vec<(Range<usize>, UserId)>,
 51    pub reply_to_message_id: Option<u64>,
 52    pub edited_at: Option<OffsetDateTime>,
 53}
 54
 55#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
 56pub enum ChannelMessageId {
 57    Saved(u64),
 58    Pending(usize),
 59}
 60
 61impl From<ChannelMessageId> for Option<u64> {
 62    fn from(val: ChannelMessageId) -> Self {
 63        match val {
 64            ChannelMessageId::Saved(id) => Some(id),
 65            ChannelMessageId::Pending(_) => None,
 66        }
 67    }
 68}
 69
 70#[derive(Clone, Debug, Default)]
 71pub struct ChannelMessageSummary {
 72    max_id: ChannelMessageId,
 73    count: usize,
 74}
 75
 76#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
 77struct Count(usize);
 78
 79#[derive(Clone, Debug, PartialEq)]
 80pub enum ChannelChatEvent {
 81    MessagesUpdated {
 82        old_range: Range<usize>,
 83        new_count: usize,
 84    },
 85    UpdateMessage {
 86        message_id: ChannelMessageId,
 87        message_ix: usize,
 88    },
 89    NewMessage {
 90        channel_id: ChannelId,
 91        message_id: u64,
 92    },
 93}
 94
 95impl EventEmitter<ChannelChatEvent> for ChannelChat {}
 96pub fn init(client: &AnyProtoClient) {
 97    client.add_entity_message_handler(ChannelChat::handle_message_sent);
 98    client.add_entity_message_handler(ChannelChat::handle_message_removed);
 99    client.add_entity_message_handler(ChannelChat::handle_message_updated);
100}
101
102impl ChannelChat {
103    pub async fn new(
104        channel: Arc<Channel>,
105        channel_store: Entity<ChannelStore>,
106        user_store: Entity<UserStore>,
107        client: Arc<Client>,
108        cx: &mut AsyncApp,
109    ) -> Result<Entity<Self>> {
110        let channel_id = channel.id;
111        let subscription = client.subscribe_to_entity(channel_id.0).unwrap();
112
113        let response = client
114            .request(proto::JoinChannelChat {
115                channel_id: channel_id.0,
116            })
117            .await?;
118
119        let handle = cx.new(|cx| {
120            cx.on_release(Self::release).detach();
121            Self {
122                channel_id: channel.id,
123                user_store: user_store.clone(),
124                channel_store,
125                rpc: client.clone(),
126                outgoing_messages_lock: Default::default(),
127                messages: Default::default(),
128                acknowledged_message_ids: Default::default(),
129                loaded_all_messages: false,
130                next_pending_message_id: 0,
131                last_acknowledged_id: None,
132                rng: StdRng::from_entropy(),
133                first_loaded_message_id: None,
134                _subscription: subscription.set_entity(&cx.entity(), &cx.to_async()),
135            }
136        })?;
137        Self::handle_loaded_messages(
138            handle.downgrade(),
139            user_store,
140            client,
141            response.messages,
142            response.done,
143            cx,
144        )
145        .await?;
146        Ok(handle)
147    }
148
149    fn release(&mut self, _: &mut App) {
150        self.rpc
151            .send(proto::LeaveChannelChat {
152                channel_id: self.channel_id.0,
153            })
154            .log_err();
155    }
156
157    pub fn channel(&self, cx: &App) -> Option<Arc<Channel>> {
158        self.channel_store
159            .read(cx)
160            .channel_for_id(self.channel_id)
161            .cloned()
162    }
163
164    pub fn client(&self) -> &Arc<Client> {
165        &self.rpc
166    }
167
168    pub fn send_message(
169        &mut self,
170        message: MessageParams,
171        cx: &mut Context<Self>,
172    ) -> Result<Task<Result<u64>>> {
173        if message.text.trim().is_empty() {
174            Err(anyhow!("message body can't be empty"))?;
175        }
176
177        let current_user = self
178            .user_store
179            .read(cx)
180            .current_user()
181            .ok_or_else(|| anyhow!("current_user is not present"))?;
182
183        let channel_id = self.channel_id;
184        let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id));
185        let nonce = self.rng.r#gen();
186        self.insert_messages(
187            SumTree::from_item(
188                ChannelMessage {
189                    id: pending_id,
190                    body: message.text.clone(),
191                    sender: current_user,
192                    timestamp: OffsetDateTime::now_utc(),
193                    mentions: message.mentions.clone(),
194                    nonce,
195                    reply_to_message_id: message.reply_to_message_id,
196                    edited_at: None,
197                },
198                &(),
199            ),
200            cx,
201        );
202        let user_store = self.user_store.clone();
203        let rpc = self.rpc.clone();
204        let outgoing_messages_lock = self.outgoing_messages_lock.clone();
205
206        // todo - handle messages that fail to send (e.g. >1024 chars)
207        Ok(cx.spawn(async move |this, cx| {
208            let outgoing_message_guard = outgoing_messages_lock.lock().await;
209            let request = rpc.request(proto::SendChannelMessage {
210                channel_id: channel_id.0,
211                body: message.text,
212                nonce: Some(nonce.into()),
213                mentions: mentions_to_proto(&message.mentions),
214                reply_to_message_id: message.reply_to_message_id,
215            });
216            let response = request.await?;
217            drop(outgoing_message_guard);
218            let response = response.message.ok_or_else(|| anyhow!("invalid message"))?;
219            let id = response.id;
220            let message = ChannelMessage::from_proto(response, &user_store, cx).await?;
221            this.update(cx, |this, cx| {
222                this.insert_messages(SumTree::from_item(message, &()), cx);
223                if this.first_loaded_message_id.is_none() {
224                    this.first_loaded_message_id = Some(id);
225                }
226            })?;
227            Ok(id)
228        }))
229    }
230
231    pub fn remove_message(&mut self, id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
232        let response = self.rpc.request(proto::RemoveChannelMessage {
233            channel_id: self.channel_id.0,
234            message_id: id,
235        });
236        cx.spawn(async move |this, cx| {
237            response.await?;
238            this.update(cx, |this, cx| {
239                this.message_removed(id, cx);
240            })?;
241            Ok(())
242        })
243    }
244
245    pub fn update_message(
246        &mut self,
247        id: u64,
248        message: MessageParams,
249        cx: &mut Context<Self>,
250    ) -> Result<Task<Result<()>>> {
251        self.message_update(
252            ChannelMessageId::Saved(id),
253            message.text.clone(),
254            message.mentions.clone(),
255            Some(OffsetDateTime::now_utc()),
256            cx,
257        );
258
259        let nonce: u128 = self.rng.r#gen();
260
261        let request = self.rpc.request(proto::UpdateChannelMessage {
262            channel_id: self.channel_id.0,
263            message_id: id,
264            body: message.text,
265            nonce: Some(nonce.into()),
266            mentions: mentions_to_proto(&message.mentions),
267        });
268        Ok(cx.spawn(async move |_, _| {
269            request.await?;
270            Ok(())
271        }))
272    }
273
274    pub fn load_more_messages(&mut self, cx: &mut Context<Self>) -> Option<Task<Option<()>>> {
275        if self.loaded_all_messages {
276            return None;
277        }
278
279        let rpc = self.rpc.clone();
280        let user_store = self.user_store.clone();
281        let channel_id = self.channel_id;
282        let before_message_id = self.first_loaded_message_id()?;
283        Some(cx.spawn(async move |this, cx| {
284            async move {
285                let response = rpc
286                    .request(proto::GetChannelMessages {
287                        channel_id: channel_id.0,
288                        before_message_id,
289                    })
290                    .await?;
291                Self::handle_loaded_messages(
292                    this,
293                    user_store,
294                    rpc,
295                    response.messages,
296                    response.done,
297                    cx,
298                )
299                .await?;
300
301                anyhow::Ok(())
302            }
303            .log_err()
304            .await
305        }))
306    }
307
308    pub fn first_loaded_message_id(&mut self) -> Option<u64> {
309        self.first_loaded_message_id
310    }
311
312    /// Load a message by its id, if it's already stored locally.
313    pub fn find_loaded_message(&self, id: u64) -> Option<&ChannelMessage> {
314        self.messages.iter().find(|message| match message.id {
315            ChannelMessageId::Saved(message_id) => message_id == id,
316            ChannelMessageId::Pending(_) => false,
317        })
318    }
319
320    /// Load all of the chat messages since a certain message id.
321    ///
322    /// For now, we always maintain a suffix of the channel's messages.
323    pub async fn load_history_since_message(
324        chat: Entity<Self>,
325        message_id: u64,
326        mut cx: AsyncApp,
327    ) -> Option<usize> {
328        loop {
329            let step = chat
330                .update(&mut cx, |chat, cx| {
331                    if let Some(first_id) = chat.first_loaded_message_id() {
332                        if first_id <= message_id {
333                            let mut cursor = chat.messages.cursor::<(ChannelMessageId, Count)>(&());
334                            let message_id = ChannelMessageId::Saved(message_id);
335                            cursor.seek(&message_id, Bias::Left, &());
336                            return ControlFlow::Break(
337                                if cursor
338                                    .item()
339                                    .map_or(false, |message| message.id == message_id)
340                                {
341                                    Some(cursor.start().1.0)
342                                } else {
343                                    None
344                                },
345                            );
346                        }
347                    }
348                    ControlFlow::Continue(chat.load_more_messages(cx))
349                })
350                .log_err()?;
351            match step {
352                ControlFlow::Break(ix) => return ix,
353                ControlFlow::Continue(task) => task?.await?,
354            }
355        }
356    }
357
358    pub fn acknowledge_last_message(&mut self, cx: &mut Context<Self>) {
359        if let ChannelMessageId::Saved(latest_message_id) = self.messages.summary().max_id {
360            if self
361                .last_acknowledged_id
362                .map_or(true, |acknowledged_id| acknowledged_id < latest_message_id)
363            {
364                self.rpc
365                    .send(proto::AckChannelMessage {
366                        channel_id: self.channel_id.0,
367                        message_id: latest_message_id,
368                    })
369                    .ok();
370                self.last_acknowledged_id = Some(latest_message_id);
371                self.channel_store.update(cx, |store, cx| {
372                    store.acknowledge_message_id(self.channel_id, latest_message_id, cx);
373                });
374            }
375        }
376    }
377
378    async fn handle_loaded_messages(
379        this: WeakEntity<Self>,
380        user_store: Entity<UserStore>,
381        rpc: Arc<Client>,
382        proto_messages: Vec<proto::ChannelMessage>,
383        loaded_all_messages: bool,
384        cx: &mut AsyncApp,
385    ) -> Result<()> {
386        let loaded_messages = messages_from_proto(proto_messages, &user_store, cx).await?;
387
388        let first_loaded_message_id = loaded_messages.first().map(|m| m.id);
389        let loaded_message_ids = this.update(cx, |this, _| {
390            let mut loaded_message_ids: HashSet<u64> = HashSet::default();
391            for message in loaded_messages.iter() {
392                if let Some(saved_message_id) = message.id.into() {
393                    loaded_message_ids.insert(saved_message_id);
394                }
395            }
396            for message in this.messages.iter() {
397                if let Some(saved_message_id) = message.id.into() {
398                    loaded_message_ids.insert(saved_message_id);
399                }
400            }
401            loaded_message_ids
402        })?;
403
404        let missing_ancestors = loaded_messages
405            .iter()
406            .filter_map(|message| {
407                if let Some(ancestor_id) = message.reply_to_message_id {
408                    if !loaded_message_ids.contains(&ancestor_id) {
409                        return Some(ancestor_id);
410                    }
411                }
412                None
413            })
414            .collect::<Vec<_>>();
415
416        let loaded_ancestors = if missing_ancestors.is_empty() {
417            None
418        } else {
419            let response = rpc
420                .request(proto::GetChannelMessagesById {
421                    message_ids: missing_ancestors,
422                })
423                .await?;
424            Some(messages_from_proto(response.messages, &user_store, cx).await?)
425        };
426        this.update(cx, |this, cx| {
427            this.first_loaded_message_id = first_loaded_message_id.and_then(|msg_id| msg_id.into());
428            this.loaded_all_messages = loaded_all_messages;
429            this.insert_messages(loaded_messages, cx);
430            if let Some(loaded_ancestors) = loaded_ancestors {
431                this.insert_messages(loaded_ancestors, cx);
432            }
433        })?;
434
435        Ok(())
436    }
437
438    pub fn rejoin(&mut self, cx: &mut Context<Self>) {
439        let user_store = self.user_store.clone();
440        let rpc = self.rpc.clone();
441        let channel_id = self.channel_id;
442        cx.spawn(async move |this, cx| {
443            async move {
444                let response = rpc
445                    .request(proto::JoinChannelChat {
446                        channel_id: channel_id.0,
447                    })
448                    .await?;
449                Self::handle_loaded_messages(
450                    this.clone(),
451                    user_store.clone(),
452                    rpc.clone(),
453                    response.messages,
454                    response.done,
455                    cx,
456                )
457                .await?;
458
459                let pending_messages = this.update(cx, |this, _| {
460                    this.pending_messages().cloned().collect::<Vec<_>>()
461                })?;
462
463                for pending_message in pending_messages {
464                    let request = rpc.request(proto::SendChannelMessage {
465                        channel_id: channel_id.0,
466                        body: pending_message.body,
467                        mentions: mentions_to_proto(&pending_message.mentions),
468                        nonce: Some(pending_message.nonce.into()),
469                        reply_to_message_id: pending_message.reply_to_message_id,
470                    });
471                    let response = request.await?;
472                    let message = ChannelMessage::from_proto(
473                        response.message.ok_or_else(|| anyhow!("invalid message"))?,
474                        &user_store,
475                        cx,
476                    )
477                    .await?;
478                    this.update(cx, |this, cx| {
479                        this.insert_messages(SumTree::from_item(message, &()), cx);
480                    })?;
481                }
482
483                anyhow::Ok(())
484            }
485            .log_err()
486            .await
487        })
488        .detach();
489    }
490
491    pub fn message_count(&self) -> usize {
492        self.messages.summary().count
493    }
494
495    pub fn messages(&self) -> &SumTree<ChannelMessage> {
496        &self.messages
497    }
498
499    pub fn message(&self, ix: usize) -> &ChannelMessage {
500        let mut cursor = self.messages.cursor::<Count>(&());
501        cursor.seek(&Count(ix), Bias::Right, &());
502        cursor.item().unwrap()
503    }
504
505    pub fn acknowledge_message(&mut self, id: u64) {
506        if self.acknowledged_message_ids.insert(id) {
507            self.rpc
508                .send(proto::AckChannelMessage {
509                    channel_id: self.channel_id.0,
510                    message_id: id,
511                })
512                .ok();
513        }
514    }
515
516    pub fn messages_in_range(&self, range: Range<usize>) -> impl Iterator<Item = &ChannelMessage> {
517        let mut cursor = self.messages.cursor::<Count>(&());
518        cursor.seek(&Count(range.start), Bias::Right, &());
519        cursor.take(range.len())
520    }
521
522    pub fn pending_messages(&self) -> impl Iterator<Item = &ChannelMessage> {
523        let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
524        cursor.seek(&ChannelMessageId::Pending(0), Bias::Left, &());
525        cursor
526    }
527
528    async fn handle_message_sent(
529        this: Entity<Self>,
530        message: TypedEnvelope<proto::ChannelMessageSent>,
531        mut cx: AsyncApp,
532    ) -> Result<()> {
533        let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
534        let message = message
535            .payload
536            .message
537            .ok_or_else(|| anyhow!("empty message"))?;
538        let message_id = message.id;
539
540        let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
541        this.update(&mut cx, |this, cx| {
542            this.insert_messages(SumTree::from_item(message, &()), cx);
543            cx.emit(ChannelChatEvent::NewMessage {
544                channel_id: this.channel_id,
545                message_id,
546            })
547        })?;
548
549        Ok(())
550    }
551
552    async fn handle_message_removed(
553        this: Entity<Self>,
554        message: TypedEnvelope<proto::RemoveChannelMessage>,
555        mut cx: AsyncApp,
556    ) -> Result<()> {
557        this.update(&mut cx, |this, cx| {
558            this.message_removed(message.payload.message_id, cx)
559        })?;
560        Ok(())
561    }
562
563    async fn handle_message_updated(
564        this: Entity<Self>,
565        message: TypedEnvelope<proto::ChannelMessageUpdate>,
566        mut cx: AsyncApp,
567    ) -> Result<()> {
568        let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
569        let message = message
570            .payload
571            .message
572            .ok_or_else(|| anyhow!("empty message"))?;
573
574        let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
575
576        this.update(&mut cx, |this, cx| {
577            this.message_update(
578                message.id,
579                message.body,
580                message.mentions,
581                message.edited_at,
582                cx,
583            )
584        })?;
585        Ok(())
586    }
587
588    fn insert_messages(&mut self, messages: SumTree<ChannelMessage>, cx: &mut Context<Self>) {
589        if let Some((first_message, last_message)) = messages.first().zip(messages.last()) {
590            let nonces = messages
591                .cursor::<()>(&())
592                .map(|m| m.nonce)
593                .collect::<HashSet<_>>();
594
595            let mut old_cursor = self.messages.cursor::<(ChannelMessageId, Count)>(&());
596            let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left, &());
597            let start_ix = old_cursor.start().1.0;
598            let removed_messages = old_cursor.slice(&last_message.id, Bias::Right, &());
599            let removed_count = removed_messages.summary().count;
600            let new_count = messages.summary().count;
601            let end_ix = start_ix + removed_count;
602
603            new_messages.append(messages, &());
604
605            let mut ranges = Vec::<Range<usize>>::new();
606            if new_messages.last().unwrap().is_pending() {
607                new_messages.append(old_cursor.suffix(&()), &());
608            } else {
609                new_messages.append(
610                    old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left, &()),
611                    &(),
612                );
613
614                while let Some(message) = old_cursor.item() {
615                    let message_ix = old_cursor.start().1.0;
616                    if nonces.contains(&message.nonce) {
617                        if ranges.last().map_or(false, |r| r.end == message_ix) {
618                            ranges.last_mut().unwrap().end += 1;
619                        } else {
620                            ranges.push(message_ix..message_ix + 1);
621                        }
622                    } else {
623                        new_messages.push(message.clone(), &());
624                    }
625                    old_cursor.next(&());
626                }
627            }
628
629            drop(old_cursor);
630            self.messages = new_messages;
631
632            for range in ranges.into_iter().rev() {
633                cx.emit(ChannelChatEvent::MessagesUpdated {
634                    old_range: range,
635                    new_count: 0,
636                });
637            }
638            cx.emit(ChannelChatEvent::MessagesUpdated {
639                old_range: start_ix..end_ix,
640                new_count,
641            });
642
643            cx.notify();
644        }
645    }
646
647    fn message_removed(&mut self, id: u64, cx: &mut Context<Self>) {
648        let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
649        let mut messages = cursor.slice(&ChannelMessageId::Saved(id), Bias::Left, &());
650        if let Some(item) = cursor.item() {
651            if item.id == ChannelMessageId::Saved(id) {
652                let deleted_message_ix = messages.summary().count;
653                cursor.next(&());
654                messages.append(cursor.suffix(&()), &());
655                drop(cursor);
656                self.messages = messages;
657
658                // If the message that was deleted was the last acknowledged message,
659                // replace the acknowledged message with an earlier one.
660                self.channel_store.update(cx, |store, _| {
661                    let summary = self.messages.summary();
662                    if summary.count == 0 {
663                        store.set_acknowledged_message_id(self.channel_id, None);
664                    } else if deleted_message_ix == summary.count {
665                        if let ChannelMessageId::Saved(id) = summary.max_id {
666                            store.set_acknowledged_message_id(self.channel_id, Some(id));
667                        }
668                    }
669                });
670
671                cx.emit(ChannelChatEvent::MessagesUpdated {
672                    old_range: deleted_message_ix..deleted_message_ix + 1,
673                    new_count: 0,
674                });
675            }
676        }
677    }
678
679    fn message_update(
680        &mut self,
681        id: ChannelMessageId,
682        body: String,
683        mentions: Vec<(Range<usize>, u64)>,
684        edited_at: Option<OffsetDateTime>,
685        cx: &mut Context<Self>,
686    ) {
687        let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
688        let mut messages = cursor.slice(&id, Bias::Left, &());
689        let ix = messages.summary().count;
690
691        if let Some(mut message_to_update) = cursor.item().cloned() {
692            message_to_update.body = body;
693            message_to_update.mentions = mentions;
694            message_to_update.edited_at = edited_at;
695            messages.push(message_to_update, &());
696            cursor.next(&());
697        }
698
699        messages.append(cursor.suffix(&()), &());
700        drop(cursor);
701        self.messages = messages;
702
703        cx.emit(ChannelChatEvent::UpdateMessage {
704            message_ix: ix,
705            message_id: id,
706        });
707
708        cx.notify();
709    }
710}
711
712async fn messages_from_proto(
713    proto_messages: Vec<proto::ChannelMessage>,
714    user_store: &Entity<UserStore>,
715    cx: &mut AsyncApp,
716) -> Result<SumTree<ChannelMessage>> {
717    let messages = ChannelMessage::from_proto_vec(proto_messages, user_store, cx).await?;
718    let mut result = SumTree::default();
719    result.extend(messages, &());
720    Ok(result)
721}
722
723impl ChannelMessage {
724    pub async fn from_proto(
725        message: proto::ChannelMessage,
726        user_store: &Entity<UserStore>,
727        cx: &mut AsyncApp,
728    ) -> Result<Self> {
729        let sender = user_store
730            .update(cx, |user_store, cx| {
731                user_store.get_user(message.sender_id, cx)
732            })?
733            .await?;
734
735        let edited_at = message.edited_at.and_then(|t| -> Option<OffsetDateTime> {
736            if let Ok(a) = OffsetDateTime::from_unix_timestamp(t as i64) {
737                return Some(a);
738            }
739
740            None
741        });
742
743        Ok(ChannelMessage {
744            id: ChannelMessageId::Saved(message.id),
745            body: message.body,
746            mentions: message
747                .mentions
748                .into_iter()
749                .filter_map(|mention| {
750                    let range = mention.range?;
751                    Some((range.start as usize..range.end as usize, mention.user_id))
752                })
753                .collect(),
754            timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?,
755            sender,
756            nonce: message
757                .nonce
758                .ok_or_else(|| anyhow!("nonce is required"))?
759                .into(),
760            reply_to_message_id: message.reply_to_message_id,
761            edited_at,
762        })
763    }
764
765    pub fn is_pending(&self) -> bool {
766        matches!(self.id, ChannelMessageId::Pending(_))
767    }
768
769    pub async fn from_proto_vec(
770        proto_messages: Vec<proto::ChannelMessage>,
771        user_store: &Entity<UserStore>,
772        cx: &mut AsyncApp,
773    ) -> Result<Vec<Self>> {
774        let unique_user_ids = proto_messages
775            .iter()
776            .map(|m| m.sender_id)
777            .collect::<HashSet<_>>()
778            .into_iter()
779            .collect();
780        user_store
781            .update(cx, |user_store, cx| {
782                user_store.get_users(unique_user_ids, cx)
783            })?
784            .await?;
785
786        let mut messages = Vec::with_capacity(proto_messages.len());
787        for message in proto_messages {
788            messages.push(ChannelMessage::from_proto(message, user_store, cx).await?);
789        }
790        Ok(messages)
791    }
792}
793
794pub fn mentions_to_proto(mentions: &[(Range<usize>, UserId)]) -> Vec<proto::ChatMention> {
795    mentions
796        .iter()
797        .map(|(range, user_id)| proto::ChatMention {
798            range: Some(proto::Range {
799                start: range.start as u64,
800                end: range.end as u64,
801            }),
802            user_id: *user_id,
803        })
804        .collect()
805}
806
807impl sum_tree::Item for ChannelMessage {
808    type Summary = ChannelMessageSummary;
809
810    fn summary(&self, _cx: &()) -> Self::Summary {
811        ChannelMessageSummary {
812            max_id: self.id,
813            count: 1,
814        }
815    }
816}
817
818impl Default for ChannelMessageId {
819    fn default() -> Self {
820        Self::Saved(0)
821    }
822}
823
824impl sum_tree::Summary for ChannelMessageSummary {
825    type Context = ();
826
827    fn zero(_cx: &Self::Context) -> Self {
828        Default::default()
829    }
830
831    fn add_summary(&mut self, summary: &Self, _: &()) {
832        self.max_id = summary.max_id;
833        self.count += summary.count;
834    }
835}
836
837impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId {
838    fn zero(_cx: &()) -> Self {
839        Default::default()
840    }
841
842    fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
843        debug_assert!(summary.max_id > *self);
844        *self = summary.max_id;
845    }
846}
847
848impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count {
849    fn zero(_cx: &()) -> Self {
850        Default::default()
851    }
852
853    fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
854        self.0 += summary.count;
855    }
856}
857
858impl<'a> From<&'a str> for MessageParams {
859    fn from(value: &'a str) -> Self {
860        Self {
861            text: value.into(),
862            mentions: Vec::new(),
863            reply_to_message_id: None,
864        }
865    }
866}