channel_chat.rs

  1use crate::{Channel, ChannelStore};
  2use anyhow::{anyhow, Result};
  3use client::{
  4    proto,
  5    user::{User, UserStore},
  6    ChannelId, Client, Subscription, TypedEnvelope, UserId,
  7};
  8use collections::HashSet;
  9use futures::lock::Mutex;
 10use gpui::{
 11    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
 12};
 13use rand::prelude::*;
 14use rpc::AnyProtoClient;
 15use std::{
 16    ops::{ControlFlow, Range},
 17    sync::Arc,
 18};
 19use sum_tree::{Bias, SumTree};
 20use time::OffsetDateTime;
 21use util::{post_inc, ResultExt as _, TryFutureExt};
 22
 23pub struct ChannelChat {
 24    pub channel_id: ChannelId,
 25    messages: SumTree<ChannelMessage>,
 26    acknowledged_message_ids: HashSet<u64>,
 27    channel_store: Model<ChannelStore>,
 28    loaded_all_messages: bool,
 29    last_acknowledged_id: Option<u64>,
 30    next_pending_message_id: usize,
 31    first_loaded_message_id: Option<u64>,
 32    user_store: Model<UserStore>,
 33    rpc: Arc<Client>,
 34    outgoing_messages_lock: Arc<Mutex<()>>,
 35    rng: StdRng,
 36    _subscription: Subscription,
 37}
 38
 39#[derive(Debug, PartialEq, Eq)]
 40pub struct MessageParams {
 41    pub text: String,
 42    pub mentions: Vec<(Range<usize>, UserId)>,
 43    pub reply_to_message_id: Option<u64>,
 44}
 45
 46#[derive(Clone, Debug)]
 47pub struct ChannelMessage {
 48    pub id: ChannelMessageId,
 49    pub body: String,
 50    pub timestamp: OffsetDateTime,
 51    pub sender: Arc<User>,
 52    pub nonce: u128,
 53    pub mentions: Vec<(Range<usize>, UserId)>,
 54    pub reply_to_message_id: Option<u64>,
 55    pub edited_at: Option<OffsetDateTime>,
 56}
 57
 58#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
 59pub enum ChannelMessageId {
 60    Saved(u64),
 61    Pending(usize),
 62}
 63
 64impl From<ChannelMessageId> for Option<u64> {
 65    fn from(val: ChannelMessageId) -> Self {
 66        match val {
 67            ChannelMessageId::Saved(id) => Some(id),
 68            ChannelMessageId::Pending(_) => None,
 69        }
 70    }
 71}
 72
 73#[derive(Clone, Debug, Default)]
 74pub struct ChannelMessageSummary {
 75    max_id: ChannelMessageId,
 76    count: usize,
 77}
 78
 79#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
 80struct Count(usize);
 81
 82#[derive(Clone, Debug, PartialEq)]
 83pub enum ChannelChatEvent {
 84    MessagesUpdated {
 85        old_range: Range<usize>,
 86        new_count: usize,
 87    },
 88    UpdateMessage {
 89        message_id: ChannelMessageId,
 90        message_ix: usize,
 91    },
 92    NewMessage {
 93        channel_id: ChannelId,
 94        message_id: u64,
 95    },
 96}
 97
 98impl EventEmitter<ChannelChatEvent> for ChannelChat {}
 99pub fn init(client: &AnyProtoClient) {
100    client.add_model_message_handler(ChannelChat::handle_message_sent);
101    client.add_model_message_handler(ChannelChat::handle_message_removed);
102    client.add_model_message_handler(ChannelChat::handle_message_updated);
103}
104
105impl ChannelChat {
106    pub async fn new(
107        channel: Arc<Channel>,
108        channel_store: Model<ChannelStore>,
109        user_store: Model<UserStore>,
110        client: Arc<Client>,
111        mut cx: AsyncAppContext,
112    ) -> Result<Model<Self>> {
113        let channel_id = channel.id;
114        let subscription = client.subscribe_to_entity(channel_id.0).unwrap();
115
116        let response = client
117            .request(proto::JoinChannelChat {
118                channel_id: channel_id.0,
119            })
120            .await?;
121
122        let handle = cx.new_model(|cx| {
123            cx.on_release(Self::release).detach();
124            Self {
125                channel_id: channel.id,
126                user_store: user_store.clone(),
127                channel_store,
128                rpc: client.clone(),
129                outgoing_messages_lock: Default::default(),
130                messages: Default::default(),
131                acknowledged_message_ids: Default::default(),
132                loaded_all_messages: false,
133                next_pending_message_id: 0,
134                last_acknowledged_id: None,
135                rng: StdRng::from_entropy(),
136                first_loaded_message_id: None,
137                _subscription: subscription.set_model(&cx.handle(), &mut cx.to_async()),
138            }
139        })?;
140        Self::handle_loaded_messages(
141            handle.downgrade(),
142            user_store,
143            client,
144            response.messages,
145            response.done,
146            &mut cx,
147        )
148        .await?;
149        Ok(handle)
150    }
151
152    fn release(&mut self, _: &mut AppContext) {
153        self.rpc
154            .send(proto::LeaveChannelChat {
155                channel_id: self.channel_id.0,
156            })
157            .log_err();
158    }
159
160    pub fn channel(&self, cx: &AppContext) -> Option<Arc<Channel>> {
161        self.channel_store
162            .read(cx)
163            .channel_for_id(self.channel_id)
164            .cloned()
165    }
166
167    pub fn client(&self) -> &Arc<Client> {
168        &self.rpc
169    }
170
171    pub fn send_message(
172        &mut self,
173        message: MessageParams,
174        cx: &mut ModelContext<Self>,
175    ) -> Result<Task<Result<u64>>> {
176        if message.text.trim().is_empty() {
177            Err(anyhow!("message body can't be empty"))?;
178        }
179
180        let current_user = self
181            .user_store
182            .read(cx)
183            .current_user()
184            .ok_or_else(|| anyhow!("current_user is not present"))?;
185
186        let channel_id = self.channel_id;
187        let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id));
188        let nonce = self.rng.gen();
189        self.insert_messages(
190            SumTree::from_item(
191                ChannelMessage {
192                    id: pending_id,
193                    body: message.text.clone(),
194                    sender: current_user,
195                    timestamp: OffsetDateTime::now_utc(),
196                    mentions: message.mentions.clone(),
197                    nonce,
198                    reply_to_message_id: message.reply_to_message_id,
199                    edited_at: None,
200                },
201                &(),
202            ),
203            cx,
204        );
205        let user_store = self.user_store.clone();
206        let rpc = self.rpc.clone();
207        let outgoing_messages_lock = self.outgoing_messages_lock.clone();
208
209        // todo - handle messages that fail to send (e.g. >1024 chars)
210        Ok(cx.spawn(move |this, mut cx| async move {
211            let outgoing_message_guard = outgoing_messages_lock.lock().await;
212            let request = rpc.request(proto::SendChannelMessage {
213                channel_id: channel_id.0,
214                body: message.text,
215                nonce: Some(nonce.into()),
216                mentions: mentions_to_proto(&message.mentions),
217                reply_to_message_id: message.reply_to_message_id,
218            });
219            let response = request.await?;
220            drop(outgoing_message_guard);
221            let response = response.message.ok_or_else(|| anyhow!("invalid message"))?;
222            let id = response.id;
223            let message = ChannelMessage::from_proto(response, &user_store, &mut cx).await?;
224            this.update(&mut cx, |this, cx| {
225                this.insert_messages(SumTree::from_item(message, &()), cx);
226                if this.first_loaded_message_id.is_none() {
227                    this.first_loaded_message_id = Some(id);
228                }
229            })?;
230            Ok(id)
231        }))
232    }
233
234    pub fn remove_message(&mut self, id: u64, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
235        let response = self.rpc.request(proto::RemoveChannelMessage {
236            channel_id: self.channel_id.0,
237            message_id: id,
238        });
239        cx.spawn(move |this, mut cx| async move {
240            response.await?;
241            this.update(&mut cx, |this, cx| {
242                this.message_removed(id, cx);
243            })?;
244            Ok(())
245        })
246    }
247
248    pub fn update_message(
249        &mut self,
250        id: u64,
251        message: MessageParams,
252        cx: &mut ModelContext<Self>,
253    ) -> Result<Task<Result<()>>> {
254        self.message_update(
255            ChannelMessageId::Saved(id),
256            message.text.clone(),
257            message.mentions.clone(),
258            Some(OffsetDateTime::now_utc()),
259            cx,
260        );
261
262        let nonce: u128 = self.rng.gen();
263
264        let request = self.rpc.request(proto::UpdateChannelMessage {
265            channel_id: self.channel_id.0,
266            message_id: id,
267            body: message.text,
268            nonce: Some(nonce.into()),
269            mentions: mentions_to_proto(&message.mentions),
270        });
271        Ok(cx.spawn(move |_, _| async move {
272            request.await?;
273            Ok(())
274        }))
275    }
276
277    pub fn load_more_messages(&mut self, cx: &mut ModelContext<Self>) -> Option<Task<Option<()>>> {
278        if self.loaded_all_messages {
279            return None;
280        }
281
282        let rpc = self.rpc.clone();
283        let user_store = self.user_store.clone();
284        let channel_id = self.channel_id;
285        let before_message_id = self.first_loaded_message_id()?;
286        Some(cx.spawn(move |this, mut cx| {
287            async move {
288                let response = rpc
289                    .request(proto::GetChannelMessages {
290                        channel_id: channel_id.0,
291                        before_message_id,
292                    })
293                    .await?;
294                Self::handle_loaded_messages(
295                    this,
296                    user_store,
297                    rpc,
298                    response.messages,
299                    response.done,
300                    &mut cx,
301                )
302                .await?;
303
304                anyhow::Ok(())
305            }
306            .log_err()
307        }))
308    }
309
310    pub fn first_loaded_message_id(&mut self) -> Option<u64> {
311        self.first_loaded_message_id
312    }
313
314    /// Load a message by its id, if it's already stored locally.
315    pub fn find_loaded_message(&self, id: u64) -> Option<&ChannelMessage> {
316        self.messages.iter().find(|message| match message.id {
317            ChannelMessageId::Saved(message_id) => message_id == id,
318            ChannelMessageId::Pending(_) => false,
319        })
320    }
321
322    /// Load all of the chat messages since a certain message id.
323    ///
324    /// For now, we always maintain a suffix of the channel's messages.
325    pub async fn load_history_since_message(
326        chat: Model<Self>,
327        message_id: u64,
328        mut cx: AsyncAppContext,
329    ) -> Option<usize> {
330        loop {
331            let step = chat
332                .update(&mut cx, |chat, cx| {
333                    if let Some(first_id) = chat.first_loaded_message_id() {
334                        if first_id <= message_id {
335                            let mut cursor = chat.messages.cursor::<(ChannelMessageId, Count)>(&());
336                            let message_id = ChannelMessageId::Saved(message_id);
337                            cursor.seek(&message_id, Bias::Left, &());
338                            return ControlFlow::Break(
339                                if cursor
340                                    .item()
341                                    .map_or(false, |message| message.id == message_id)
342                                {
343                                    Some(cursor.start().1 .0)
344                                } else {
345                                    None
346                                },
347                            );
348                        }
349                    }
350                    ControlFlow::Continue(chat.load_more_messages(cx))
351                })
352                .log_err()?;
353            match step {
354                ControlFlow::Break(ix) => return ix,
355                ControlFlow::Continue(task) => task?.await?,
356            }
357        }
358    }
359
360    pub fn acknowledge_last_message(&mut self, cx: &mut ModelContext<Self>) {
361        if let ChannelMessageId::Saved(latest_message_id) = self.messages.summary().max_id {
362            if self
363                .last_acknowledged_id
364                .map_or(true, |acknowledged_id| acknowledged_id < latest_message_id)
365            {
366                self.rpc
367                    .send(proto::AckChannelMessage {
368                        channel_id: self.channel_id.0,
369                        message_id: latest_message_id,
370                    })
371                    .ok();
372                self.last_acknowledged_id = Some(latest_message_id);
373                self.channel_store.update(cx, |store, cx| {
374                    store.acknowledge_message_id(self.channel_id, latest_message_id, cx);
375                });
376            }
377        }
378    }
379
380    async fn handle_loaded_messages(
381        this: WeakModel<Self>,
382        user_store: Model<UserStore>,
383        rpc: Arc<Client>,
384        proto_messages: Vec<proto::ChannelMessage>,
385        loaded_all_messages: bool,
386        cx: &mut AsyncAppContext,
387    ) -> Result<()> {
388        let loaded_messages = messages_from_proto(proto_messages, &user_store, cx).await?;
389
390        let first_loaded_message_id = loaded_messages.first().map(|m| m.id);
391        let loaded_message_ids = this.update(cx, |this, _| {
392            let mut loaded_message_ids: HashSet<u64> = HashSet::default();
393            for message in loaded_messages.iter() {
394                if let Some(saved_message_id) = message.id.into() {
395                    loaded_message_ids.insert(saved_message_id);
396                }
397            }
398            for message in this.messages.iter() {
399                if let Some(saved_message_id) = message.id.into() {
400                    loaded_message_ids.insert(saved_message_id);
401                }
402            }
403            loaded_message_ids
404        })?;
405
406        let missing_ancestors = loaded_messages
407            .iter()
408            .filter_map(|message| {
409                if let Some(ancestor_id) = message.reply_to_message_id {
410                    if !loaded_message_ids.contains(&ancestor_id) {
411                        return Some(ancestor_id);
412                    }
413                }
414                None
415            })
416            .collect::<Vec<_>>();
417
418        let loaded_ancestors = if missing_ancestors.is_empty() {
419            None
420        } else {
421            let response = rpc
422                .request(proto::GetChannelMessagesById {
423                    message_ids: missing_ancestors,
424                })
425                .await?;
426            Some(messages_from_proto(response.messages, &user_store, cx).await?)
427        };
428        this.update(cx, |this, cx| {
429            this.first_loaded_message_id = first_loaded_message_id.and_then(|msg_id| msg_id.into());
430            this.loaded_all_messages = loaded_all_messages;
431            this.insert_messages(loaded_messages, cx);
432            if let Some(loaded_ancestors) = loaded_ancestors {
433                this.insert_messages(loaded_ancestors, cx);
434            }
435        })?;
436
437        Ok(())
438    }
439
440    pub fn rejoin(&mut self, cx: &mut ModelContext<Self>) {
441        let user_store = self.user_store.clone();
442        let rpc = self.rpc.clone();
443        let channel_id = self.channel_id;
444        cx.spawn(move |this, mut cx| {
445            async move {
446                let response = rpc
447                    .request(proto::JoinChannelChat {
448                        channel_id: channel_id.0,
449                    })
450                    .await?;
451                Self::handle_loaded_messages(
452                    this.clone(),
453                    user_store.clone(),
454                    rpc.clone(),
455                    response.messages,
456                    response.done,
457                    &mut cx,
458                )
459                .await?;
460
461                let pending_messages = this.update(&mut cx, |this, _| {
462                    this.pending_messages().cloned().collect::<Vec<_>>()
463                })?;
464
465                for pending_message in pending_messages {
466                    let request = rpc.request(proto::SendChannelMessage {
467                        channel_id: channel_id.0,
468                        body: pending_message.body,
469                        mentions: mentions_to_proto(&pending_message.mentions),
470                        nonce: Some(pending_message.nonce.into()),
471                        reply_to_message_id: pending_message.reply_to_message_id,
472                    });
473                    let response = request.await?;
474                    let message = ChannelMessage::from_proto(
475                        response.message.ok_or_else(|| anyhow!("invalid message"))?,
476                        &user_store,
477                        &mut cx,
478                    )
479                    .await?;
480                    this.update(&mut cx, |this, cx| {
481                        this.insert_messages(SumTree::from_item(message, &()), cx);
482                    })?;
483                }
484
485                anyhow::Ok(())
486            }
487            .log_err()
488        })
489        .detach();
490    }
491
492    pub fn message_count(&self) -> usize {
493        self.messages.summary().count
494    }
495
496    pub fn messages(&self) -> &SumTree<ChannelMessage> {
497        &self.messages
498    }
499
500    pub fn message(&self, ix: usize) -> &ChannelMessage {
501        let mut cursor = self.messages.cursor::<Count>(&());
502        cursor.seek(&Count(ix), Bias::Right, &());
503        cursor.item().unwrap()
504    }
505
506    pub fn acknowledge_message(&mut self, id: u64) {
507        if self.acknowledged_message_ids.insert(id) {
508            self.rpc
509                .send(proto::AckChannelMessage {
510                    channel_id: self.channel_id.0,
511                    message_id: id,
512                })
513                .ok();
514        }
515    }
516
517    pub fn messages_in_range(&self, range: Range<usize>) -> impl Iterator<Item = &ChannelMessage> {
518        let mut cursor = self.messages.cursor::<Count>(&());
519        cursor.seek(&Count(range.start), Bias::Right, &());
520        cursor.take(range.len())
521    }
522
523    pub fn pending_messages(&self) -> impl Iterator<Item = &ChannelMessage> {
524        let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
525        cursor.seek(&ChannelMessageId::Pending(0), Bias::Left, &());
526        cursor
527    }
528
529    async fn handle_message_sent(
530        this: Model<Self>,
531        message: TypedEnvelope<proto::ChannelMessageSent>,
532        mut cx: AsyncAppContext,
533    ) -> Result<()> {
534        let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
535        let message = message
536            .payload
537            .message
538            .ok_or_else(|| anyhow!("empty message"))?;
539        let message_id = message.id;
540
541        let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
542        this.update(&mut cx, |this, cx| {
543            this.insert_messages(SumTree::from_item(message, &()), cx);
544            cx.emit(ChannelChatEvent::NewMessage {
545                channel_id: this.channel_id,
546                message_id,
547            })
548        })?;
549
550        Ok(())
551    }
552
553    async fn handle_message_removed(
554        this: Model<Self>,
555        message: TypedEnvelope<proto::RemoveChannelMessage>,
556        mut cx: AsyncAppContext,
557    ) -> Result<()> {
558        this.update(&mut cx, |this, cx| {
559            this.message_removed(message.payload.message_id, cx)
560        })?;
561        Ok(())
562    }
563
564    async fn handle_message_updated(
565        this: Model<Self>,
566        message: TypedEnvelope<proto::ChannelMessageUpdate>,
567        mut cx: AsyncAppContext,
568    ) -> Result<()> {
569        let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
570        let message = message
571            .payload
572            .message
573            .ok_or_else(|| anyhow!("empty message"))?;
574
575        let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
576
577        this.update(&mut cx, |this, cx| {
578            this.message_update(
579                message.id,
580                message.body,
581                message.mentions,
582                message.edited_at,
583                cx,
584            )
585        })?;
586        Ok(())
587    }
588
589    fn insert_messages(&mut self, messages: SumTree<ChannelMessage>, cx: &mut ModelContext<Self>) {
590        if let Some((first_message, last_message)) = messages.first().zip(messages.last()) {
591            let nonces = messages
592                .cursor::<()>(&())
593                .map(|m| m.nonce)
594                .collect::<HashSet<_>>();
595
596            let mut old_cursor = self.messages.cursor::<(ChannelMessageId, Count)>(&());
597            let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left, &());
598            let start_ix = old_cursor.start().1 .0;
599            let removed_messages = old_cursor.slice(&last_message.id, Bias::Right, &());
600            let removed_count = removed_messages.summary().count;
601            let new_count = messages.summary().count;
602            let end_ix = start_ix + removed_count;
603
604            new_messages.append(messages, &());
605
606            let mut ranges = Vec::<Range<usize>>::new();
607            if new_messages.last().unwrap().is_pending() {
608                new_messages.append(old_cursor.suffix(&()), &());
609            } else {
610                new_messages.append(
611                    old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left, &()),
612                    &(),
613                );
614
615                while let Some(message) = old_cursor.item() {
616                    let message_ix = old_cursor.start().1 .0;
617                    if nonces.contains(&message.nonce) {
618                        if ranges.last().map_or(false, |r| r.end == message_ix) {
619                            ranges.last_mut().unwrap().end += 1;
620                        } else {
621                            ranges.push(message_ix..message_ix + 1);
622                        }
623                    } else {
624                        new_messages.push(message.clone(), &());
625                    }
626                    old_cursor.next(&());
627                }
628            }
629
630            drop(old_cursor);
631            self.messages = new_messages;
632
633            for range in ranges.into_iter().rev() {
634                cx.emit(ChannelChatEvent::MessagesUpdated {
635                    old_range: range,
636                    new_count: 0,
637                });
638            }
639            cx.emit(ChannelChatEvent::MessagesUpdated {
640                old_range: start_ix..end_ix,
641                new_count,
642            });
643
644            cx.notify();
645        }
646    }
647
648    fn message_removed(&mut self, id: u64, cx: &mut ModelContext<Self>) {
649        let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
650        let mut messages = cursor.slice(&ChannelMessageId::Saved(id), Bias::Left, &());
651        if let Some(item) = cursor.item() {
652            if item.id == ChannelMessageId::Saved(id) {
653                let deleted_message_ix = messages.summary().count;
654                cursor.next(&());
655                messages.append(cursor.suffix(&()), &());
656                drop(cursor);
657                self.messages = messages;
658
659                // If the message that was deleted was the last acknowledged message,
660                // replace the acknowledged message with an earlier one.
661                self.channel_store.update(cx, |store, _| {
662                    let summary = self.messages.summary();
663                    if summary.count == 0 {
664                        store.set_acknowledged_message_id(self.channel_id, None);
665                    } else if deleted_message_ix == summary.count {
666                        if let ChannelMessageId::Saved(id) = summary.max_id {
667                            store.set_acknowledged_message_id(self.channel_id, Some(id));
668                        }
669                    }
670                });
671
672                cx.emit(ChannelChatEvent::MessagesUpdated {
673                    old_range: deleted_message_ix..deleted_message_ix + 1,
674                    new_count: 0,
675                });
676            }
677        }
678    }
679
680    fn message_update(
681        &mut self,
682        id: ChannelMessageId,
683        body: String,
684        mentions: Vec<(Range<usize>, u64)>,
685        edited_at: Option<OffsetDateTime>,
686        cx: &mut ModelContext<Self>,
687    ) {
688        let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
689        let mut messages = cursor.slice(&id, Bias::Left, &());
690        let ix = messages.summary().count;
691
692        if let Some(mut message_to_update) = cursor.item().cloned() {
693            message_to_update.body = body;
694            message_to_update.mentions = mentions;
695            message_to_update.edited_at = edited_at;
696            messages.push(message_to_update, &());
697            cursor.next(&());
698        }
699
700        messages.append(cursor.suffix(&()), &());
701        drop(cursor);
702        self.messages = messages;
703
704        cx.emit(ChannelChatEvent::UpdateMessage {
705            message_ix: ix,
706            message_id: id,
707        });
708
709        cx.notify();
710    }
711}
712
713async fn messages_from_proto(
714    proto_messages: Vec<proto::ChannelMessage>,
715    user_store: &Model<UserStore>,
716    cx: &mut AsyncAppContext,
717) -> Result<SumTree<ChannelMessage>> {
718    let messages = ChannelMessage::from_proto_vec(proto_messages, user_store, cx).await?;
719    let mut result = SumTree::default();
720    result.extend(messages, &());
721    Ok(result)
722}
723
724impl ChannelMessage {
725    pub async fn from_proto(
726        message: proto::ChannelMessage,
727        user_store: &Model<UserStore>,
728        cx: &mut AsyncAppContext,
729    ) -> Result<Self> {
730        let sender = user_store
731            .update(cx, |user_store, cx| {
732                user_store.get_user(message.sender_id, cx)
733            })?
734            .await?;
735
736        let edited_at = message.edited_at.and_then(|t| -> Option<OffsetDateTime> {
737            if let Ok(a) = OffsetDateTime::from_unix_timestamp(t as i64) {
738                return Some(a);
739            }
740
741            None
742        });
743
744        Ok(ChannelMessage {
745            id: ChannelMessageId::Saved(message.id),
746            body: message.body,
747            mentions: message
748                .mentions
749                .into_iter()
750                .filter_map(|mention| {
751                    let range = mention.range?;
752                    Some((range.start as usize..range.end as usize, mention.user_id))
753                })
754                .collect(),
755            timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?,
756            sender,
757            nonce: message
758                .nonce
759                .ok_or_else(|| anyhow!("nonce is required"))?
760                .into(),
761            reply_to_message_id: message.reply_to_message_id,
762            edited_at,
763        })
764    }
765
766    pub fn is_pending(&self) -> bool {
767        matches!(self.id, ChannelMessageId::Pending(_))
768    }
769
770    pub async fn from_proto_vec(
771        proto_messages: Vec<proto::ChannelMessage>,
772        user_store: &Model<UserStore>,
773        cx: &mut AsyncAppContext,
774    ) -> Result<Vec<Self>> {
775        let unique_user_ids = proto_messages
776            .iter()
777            .map(|m| m.sender_id)
778            .collect::<HashSet<_>>()
779            .into_iter()
780            .collect();
781        user_store
782            .update(cx, |user_store, cx| {
783                user_store.get_users(unique_user_ids, cx)
784            })?
785            .await?;
786
787        let mut messages = Vec::with_capacity(proto_messages.len());
788        for message in proto_messages {
789            messages.push(ChannelMessage::from_proto(message, user_store, cx).await?);
790        }
791        Ok(messages)
792    }
793}
794
795pub fn mentions_to_proto(mentions: &[(Range<usize>, UserId)]) -> Vec<proto::ChatMention> {
796    mentions
797        .iter()
798        .map(|(range, user_id)| proto::ChatMention {
799            range: Some(proto::Range {
800                start: range.start as u64,
801                end: range.end as u64,
802            }),
803            user_id: *user_id,
804        })
805        .collect()
806}
807
808impl sum_tree::Item for ChannelMessage {
809    type Summary = ChannelMessageSummary;
810
811    fn summary(&self, _cx: &()) -> Self::Summary {
812        ChannelMessageSummary {
813            max_id: self.id,
814            count: 1,
815        }
816    }
817}
818
819impl Default for ChannelMessageId {
820    fn default() -> Self {
821        Self::Saved(0)
822    }
823}
824
825impl sum_tree::Summary for ChannelMessageSummary {
826    type Context = ();
827
828    fn zero(_cx: &Self::Context) -> Self {
829        Default::default()
830    }
831
832    fn add_summary(&mut self, summary: &Self, _: &()) {
833        self.max_id = summary.max_id;
834        self.count += summary.count;
835    }
836}
837
838impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId {
839    fn zero(_cx: &()) -> Self {
840        Default::default()
841    }
842
843    fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
844        debug_assert!(summary.max_id > *self);
845        *self = summary.max_id;
846    }
847}
848
849impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count {
850    fn zero(_cx: &()) -> Self {
851        Default::default()
852    }
853
854    fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
855        self.0 += summary.count;
856    }
857}
858
859impl<'a> From<&'a str> for MessageParams {
860    fn from(value: &'a str) -> Self {
861        Self {
862            text: value.into(),
863            mentions: Vec::new(),
864            reply_to_message_id: None,
865        }
866    }
867}