channel_chat.rs

  1use crate::{Channel, ChannelStore};
  2use anyhow::{Context as _, Result};
  3use client::{
  4    ChannelId, Client, Subscription, TypedEnvelope, UserId, proto,
  5    user::{User, UserStore},
  6};
  7use collections::HashSet;
  8use futures::lock::Mutex;
  9use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task, WeakEntity};
 10use rand::prelude::*;
 11use rpc::AnyProtoClient;
 12use std::{
 13    ops::{ControlFlow, Range},
 14    sync::Arc,
 15};
 16use sum_tree::{Bias, Dimensions, SumTree};
 17use time::OffsetDateTime;
 18use util::{ResultExt as _, TryFutureExt, post_inc};
 19
 20pub struct ChannelChat {
 21    pub channel_id: ChannelId,
 22    messages: SumTree<ChannelMessage>,
 23    acknowledged_message_ids: HashSet<u64>,
 24    channel_store: Entity<ChannelStore>,
 25    loaded_all_messages: bool,
 26    last_acknowledged_id: Option<u64>,
 27    next_pending_message_id: usize,
 28    first_loaded_message_id: Option<u64>,
 29    user_store: Entity<UserStore>,
 30    rpc: Arc<Client>,
 31    outgoing_messages_lock: Arc<Mutex<()>>,
 32    rng: StdRng,
 33    _subscription: Subscription,
 34}
 35
 36#[derive(Debug, PartialEq, Eq)]
 37pub struct MessageParams {
 38    pub text: String,
 39    pub mentions: Vec<(Range<usize>, UserId)>,
 40    pub reply_to_message_id: Option<u64>,
 41}
 42
 43#[derive(Clone, Debug)]
 44pub struct ChannelMessage {
 45    pub id: ChannelMessageId,
 46    pub body: String,
 47    pub timestamp: OffsetDateTime,
 48    pub sender: Arc<User>,
 49    pub nonce: u128,
 50    pub mentions: Vec<(Range<usize>, UserId)>,
 51    pub reply_to_message_id: Option<u64>,
 52    pub edited_at: Option<OffsetDateTime>,
 53}
 54
 55#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
 56pub enum ChannelMessageId {
 57    Saved(u64),
 58    Pending(usize),
 59}
 60
 61impl From<ChannelMessageId> for Option<u64> {
 62    fn from(val: ChannelMessageId) -> Self {
 63        match val {
 64            ChannelMessageId::Saved(id) => Some(id),
 65            ChannelMessageId::Pending(_) => None,
 66        }
 67    }
 68}
 69
 70#[derive(Clone, Debug, Default)]
 71pub struct ChannelMessageSummary {
 72    max_id: ChannelMessageId,
 73    count: usize,
 74}
 75
 76#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
 77struct Count(usize);
 78
 79#[derive(Clone, Debug, PartialEq)]
 80pub enum ChannelChatEvent {
 81    MessagesUpdated {
 82        old_range: Range<usize>,
 83        new_count: usize,
 84    },
 85    UpdateMessage {
 86        message_id: ChannelMessageId,
 87        message_ix: usize,
 88    },
 89    NewMessage {
 90        channel_id: ChannelId,
 91        message_id: u64,
 92    },
 93}
 94
 95impl EventEmitter<ChannelChatEvent> for ChannelChat {}
 96pub fn init(client: &AnyProtoClient) {
 97    client.add_entity_message_handler(ChannelChat::handle_message_sent);
 98    client.add_entity_message_handler(ChannelChat::handle_message_removed);
 99    client.add_entity_message_handler(ChannelChat::handle_message_updated);
100}
101
102impl ChannelChat {
103    pub async fn new(
104        channel: Arc<Channel>,
105        channel_store: Entity<ChannelStore>,
106        user_store: Entity<UserStore>,
107        client: Arc<Client>,
108        cx: &mut AsyncApp,
109    ) -> Result<Entity<Self>> {
110        let channel_id = channel.id;
111        let subscription = client.subscribe_to_entity(channel_id.0).unwrap();
112
113        let response = client
114            .request(proto::JoinChannelChat {
115                channel_id: channel_id.0,
116            })
117            .await?;
118
119        let handle = cx.new(|cx| {
120            cx.on_release(Self::release).detach();
121            Self {
122                channel_id: channel.id,
123                user_store: user_store.clone(),
124                channel_store,
125                rpc: client.clone(),
126                outgoing_messages_lock: Default::default(),
127                messages: Default::default(),
128                acknowledged_message_ids: Default::default(),
129                loaded_all_messages: false,
130                next_pending_message_id: 0,
131                last_acknowledged_id: None,
132                rng: StdRng::from_os_rng(),
133                first_loaded_message_id: None,
134                _subscription: subscription.set_entity(&cx.entity(), &cx.to_async()),
135            }
136        })?;
137        Self::handle_loaded_messages(
138            handle.downgrade(),
139            user_store,
140            client,
141            response.messages,
142            response.done,
143            cx,
144        )
145        .await?;
146        Ok(handle)
147    }
148
149    fn release(&mut self, _: &mut App) {
150        self.rpc
151            .send(proto::LeaveChannelChat {
152                channel_id: self.channel_id.0,
153            })
154            .log_err();
155    }
156
157    pub fn channel(&self, cx: &App) -> Option<Arc<Channel>> {
158        self.channel_store
159            .read(cx)
160            .channel_for_id(self.channel_id)
161            .cloned()
162    }
163
164    pub fn client(&self) -> &Arc<Client> {
165        &self.rpc
166    }
167
168    pub fn send_message(
169        &mut self,
170        message: MessageParams,
171        cx: &mut Context<Self>,
172    ) -> Result<Task<Result<u64>>> {
173        anyhow::ensure!(
174            !message.text.trim().is_empty(),
175            "message body can't be empty"
176        );
177
178        let current_user = self
179            .user_store
180            .read(cx)
181            .current_user()
182            .context("current_user is not present")?;
183
184        let channel_id = self.channel_id;
185        let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id));
186        let nonce = self.rng.random();
187        self.insert_messages(
188            SumTree::from_item(
189                ChannelMessage {
190                    id: pending_id,
191                    body: message.text.clone(),
192                    sender: current_user,
193                    timestamp: OffsetDateTime::now_utc(),
194                    mentions: message.mentions.clone(),
195                    nonce,
196                    reply_to_message_id: message.reply_to_message_id,
197                    edited_at: None,
198                },
199                &(),
200            ),
201            cx,
202        );
203        let user_store = self.user_store.clone();
204        let rpc = self.rpc.clone();
205        let outgoing_messages_lock = self.outgoing_messages_lock.clone();
206
207        // todo - handle messages that fail to send (e.g. >1024 chars)
208        Ok(cx.spawn(async move |this, cx| {
209            let outgoing_message_guard = outgoing_messages_lock.lock().await;
210            let request = rpc.request(proto::SendChannelMessage {
211                channel_id: channel_id.0,
212                body: message.text,
213                nonce: Some(nonce.into()),
214                mentions: mentions_to_proto(&message.mentions),
215                reply_to_message_id: message.reply_to_message_id,
216            });
217            let response = request.await?;
218            drop(outgoing_message_guard);
219            let response = response.message.context("invalid message")?;
220            let id = response.id;
221            let message = ChannelMessage::from_proto(response, &user_store, cx).await?;
222            this.update(cx, |this, cx| {
223                this.insert_messages(SumTree::from_item(message, &()), cx);
224                if this.first_loaded_message_id.is_none() {
225                    this.first_loaded_message_id = Some(id);
226                }
227            })?;
228            Ok(id)
229        }))
230    }
231
232    pub fn remove_message(&mut self, id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
233        let response = self.rpc.request(proto::RemoveChannelMessage {
234            channel_id: self.channel_id.0,
235            message_id: id,
236        });
237        cx.spawn(async move |this, cx| {
238            response.await?;
239            this.update(cx, |this, cx| {
240                this.message_removed(id, cx);
241            })?;
242            Ok(())
243        })
244    }
245
246    pub fn update_message(
247        &mut self,
248        id: u64,
249        message: MessageParams,
250        cx: &mut Context<Self>,
251    ) -> Result<Task<Result<()>>> {
252        self.message_update(
253            ChannelMessageId::Saved(id),
254            message.text.clone(),
255            message.mentions.clone(),
256            Some(OffsetDateTime::now_utc()),
257            cx,
258        );
259
260        let nonce: u128 = self.rng.random();
261
262        let request = self.rpc.request(proto::UpdateChannelMessage {
263            channel_id: self.channel_id.0,
264            message_id: id,
265            body: message.text,
266            nonce: Some(nonce.into()),
267            mentions: mentions_to_proto(&message.mentions),
268        });
269        Ok(cx.spawn(async move |_, _| {
270            request.await?;
271            Ok(())
272        }))
273    }
274
275    pub fn load_more_messages(&mut self, cx: &mut Context<Self>) -> Option<Task<Option<()>>> {
276        if self.loaded_all_messages {
277            return None;
278        }
279
280        let rpc = self.rpc.clone();
281        let user_store = self.user_store.clone();
282        let channel_id = self.channel_id;
283        let before_message_id = self.first_loaded_message_id()?;
284        Some(cx.spawn(async move |this, cx| {
285            async move {
286                let response = rpc
287                    .request(proto::GetChannelMessages {
288                        channel_id: channel_id.0,
289                        before_message_id,
290                    })
291                    .await?;
292                Self::handle_loaded_messages(
293                    this,
294                    user_store,
295                    rpc,
296                    response.messages,
297                    response.done,
298                    cx,
299                )
300                .await?;
301
302                anyhow::Ok(())
303            }
304            .log_err()
305            .await
306        }))
307    }
308
309    pub fn first_loaded_message_id(&mut self) -> Option<u64> {
310        self.first_loaded_message_id
311    }
312
313    /// Load a message by its id, if it's already stored locally.
314    pub fn find_loaded_message(&self, id: u64) -> Option<&ChannelMessage> {
315        self.messages.iter().find(|message| match message.id {
316            ChannelMessageId::Saved(message_id) => message_id == id,
317            ChannelMessageId::Pending(_) => false,
318        })
319    }
320
321    /// Load all of the chat messages since a certain message id.
322    ///
323    /// For now, we always maintain a suffix of the channel's messages.
324    pub async fn load_history_since_message(
325        chat: Entity<Self>,
326        message_id: u64,
327        mut cx: AsyncApp,
328    ) -> Option<usize> {
329        loop {
330            let step = chat
331                .update(&mut cx, |chat, cx| {
332                    if let Some(first_id) = chat.first_loaded_message_id()
333                        && first_id <= message_id
334                    {
335                        let mut cursor = chat
336                            .messages
337                            .cursor::<Dimensions<ChannelMessageId, Count>>(&());
338                        let message_id = ChannelMessageId::Saved(message_id);
339                        cursor.seek(&message_id, Bias::Left);
340                        return ControlFlow::Break(
341                            if cursor
342                                .item()
343                                .is_some_and(|message| message.id == message_id)
344                            {
345                                Some(cursor.start().1.0)
346                            } else {
347                                None
348                            },
349                        );
350                    }
351                    ControlFlow::Continue(chat.load_more_messages(cx))
352                })
353                .log_err()?;
354            match step {
355                ControlFlow::Break(ix) => return ix,
356                ControlFlow::Continue(task) => task?.await?,
357            }
358        }
359    }
360
361    pub fn acknowledge_last_message(&mut self, cx: &mut Context<Self>) {
362        if let ChannelMessageId::Saved(latest_message_id) = self.messages.summary().max_id
363            && self
364                .last_acknowledged_id
365                .is_none_or(|acknowledged_id| acknowledged_id < latest_message_id)
366        {
367            self.rpc
368                .send(proto::AckChannelMessage {
369                    channel_id: self.channel_id.0,
370                    message_id: latest_message_id,
371                })
372                .ok();
373            self.last_acknowledged_id = Some(latest_message_id);
374            self.channel_store.update(cx, |store, cx| {
375                store.acknowledge_message_id(self.channel_id, latest_message_id, cx);
376            });
377        }
378    }
379
380    async fn handle_loaded_messages(
381        this: WeakEntity<Self>,
382        user_store: Entity<UserStore>,
383        rpc: Arc<Client>,
384        proto_messages: Vec<proto::ChannelMessage>,
385        loaded_all_messages: bool,
386        cx: &mut AsyncApp,
387    ) -> Result<()> {
388        let loaded_messages = messages_from_proto(proto_messages, &user_store, cx).await?;
389
390        let first_loaded_message_id = loaded_messages.first().map(|m| m.id);
391        let loaded_message_ids = this.read_with(cx, |this, _| {
392            let mut loaded_message_ids: HashSet<u64> = HashSet::default();
393            for message in loaded_messages.iter() {
394                if let Some(saved_message_id) = message.id.into() {
395                    loaded_message_ids.insert(saved_message_id);
396                }
397            }
398            for message in this.messages.iter() {
399                if let Some(saved_message_id) = message.id.into() {
400                    loaded_message_ids.insert(saved_message_id);
401                }
402            }
403            loaded_message_ids
404        })?;
405
406        let missing_ancestors = loaded_messages
407            .iter()
408            .filter_map(|message| {
409                if let Some(ancestor_id) = message.reply_to_message_id
410                    && !loaded_message_ids.contains(&ancestor_id)
411                {
412                    return Some(ancestor_id);
413                }
414                None
415            })
416            .collect::<Vec<_>>();
417
418        let loaded_ancestors = if missing_ancestors.is_empty() {
419            None
420        } else {
421            let response = rpc
422                .request(proto::GetChannelMessagesById {
423                    message_ids: missing_ancestors,
424                })
425                .await?;
426            Some(messages_from_proto(response.messages, &user_store, cx).await?)
427        };
428        this.update(cx, |this, cx| {
429            this.first_loaded_message_id = first_loaded_message_id.and_then(|msg_id| msg_id.into());
430            this.loaded_all_messages = loaded_all_messages;
431            this.insert_messages(loaded_messages, cx);
432            if let Some(loaded_ancestors) = loaded_ancestors {
433                this.insert_messages(loaded_ancestors, cx);
434            }
435        })?;
436
437        Ok(())
438    }
439
440    pub fn rejoin(&mut self, cx: &mut Context<Self>) {
441        let user_store = self.user_store.clone();
442        let rpc = self.rpc.clone();
443        let channel_id = self.channel_id;
444        cx.spawn(async move |this, cx| {
445            async move {
446                let response = rpc
447                    .request(proto::JoinChannelChat {
448                        channel_id: channel_id.0,
449                    })
450                    .await?;
451                Self::handle_loaded_messages(
452                    this.clone(),
453                    user_store.clone(),
454                    rpc.clone(),
455                    response.messages,
456                    response.done,
457                    cx,
458                )
459                .await?;
460
461                let pending_messages = this.read_with(cx, |this, _| {
462                    this.pending_messages().cloned().collect::<Vec<_>>()
463                })?;
464
465                for pending_message in pending_messages {
466                    let request = rpc.request(proto::SendChannelMessage {
467                        channel_id: channel_id.0,
468                        body: pending_message.body,
469                        mentions: mentions_to_proto(&pending_message.mentions),
470                        nonce: Some(pending_message.nonce.into()),
471                        reply_to_message_id: pending_message.reply_to_message_id,
472                    });
473                    let response = request.await?;
474                    let message = ChannelMessage::from_proto(
475                        response.message.context("invalid message")?,
476                        &user_store,
477                        cx,
478                    )
479                    .await?;
480                    this.update(cx, |this, cx| {
481                        this.insert_messages(SumTree::from_item(message, &()), cx);
482                    })?;
483                }
484
485                anyhow::Ok(())
486            }
487            .log_err()
488            .await
489        })
490        .detach();
491    }
492
493    pub fn message_count(&self) -> usize {
494        self.messages.summary().count
495    }
496
497    pub fn messages(&self) -> &SumTree<ChannelMessage> {
498        &self.messages
499    }
500
501    pub fn message(&self, ix: usize) -> &ChannelMessage {
502        let mut cursor = self.messages.cursor::<Count>(&());
503        cursor.seek(&Count(ix), Bias::Right);
504        cursor.item().unwrap()
505    }
506
507    pub fn acknowledge_message(&mut self, id: u64) {
508        if self.acknowledged_message_ids.insert(id) {
509            self.rpc
510                .send(proto::AckChannelMessage {
511                    channel_id: self.channel_id.0,
512                    message_id: id,
513                })
514                .ok();
515        }
516    }
517
518    pub fn messages_in_range(&self, range: Range<usize>) -> impl Iterator<Item = &ChannelMessage> {
519        let mut cursor = self.messages.cursor::<Count>(&());
520        cursor.seek(&Count(range.start), Bias::Right);
521        cursor.take(range.len())
522    }
523
524    pub fn pending_messages(&self) -> impl Iterator<Item = &ChannelMessage> {
525        let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
526        cursor.seek(&ChannelMessageId::Pending(0), Bias::Left);
527        cursor
528    }
529
530    async fn handle_message_sent(
531        this: Entity<Self>,
532        message: TypedEnvelope<proto::ChannelMessageSent>,
533        mut cx: AsyncApp,
534    ) -> Result<()> {
535        let user_store = this.read_with(&cx, |this, _| this.user_store.clone())?;
536        let message = message.payload.message.context("empty message")?;
537        let message_id = message.id;
538
539        let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
540        this.update(&mut cx, |this, cx| {
541            this.insert_messages(SumTree::from_item(message, &()), cx);
542            cx.emit(ChannelChatEvent::NewMessage {
543                channel_id: this.channel_id,
544                message_id,
545            })
546        })?;
547
548        Ok(())
549    }
550
551    async fn handle_message_removed(
552        this: Entity<Self>,
553        message: TypedEnvelope<proto::RemoveChannelMessage>,
554        mut cx: AsyncApp,
555    ) -> Result<()> {
556        this.update(&mut cx, |this, cx| {
557            this.message_removed(message.payload.message_id, cx)
558        })?;
559        Ok(())
560    }
561
562    async fn handle_message_updated(
563        this: Entity<Self>,
564        message: TypedEnvelope<proto::ChannelMessageUpdate>,
565        mut cx: AsyncApp,
566    ) -> Result<()> {
567        let user_store = this.read_with(&cx, |this, _| this.user_store.clone())?;
568        let message = message.payload.message.context("empty message")?;
569
570        let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
571
572        this.update(&mut cx, |this, cx| {
573            this.message_update(
574                message.id,
575                message.body,
576                message.mentions,
577                message.edited_at,
578                cx,
579            )
580        })?;
581        Ok(())
582    }
583
584    fn insert_messages(&mut self, messages: SumTree<ChannelMessage>, cx: &mut Context<Self>) {
585        if let Some((first_message, last_message)) = messages.first().zip(messages.last()) {
586            let nonces = messages
587                .cursor::<()>(&())
588                .map(|m| m.nonce)
589                .collect::<HashSet<_>>();
590
591            let mut old_cursor = self
592                .messages
593                .cursor::<Dimensions<ChannelMessageId, Count>>(&());
594            let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left);
595            let start_ix = old_cursor.start().1.0;
596            let removed_messages = old_cursor.slice(&last_message.id, Bias::Right);
597            let removed_count = removed_messages.summary().count;
598            let new_count = messages.summary().count;
599            let end_ix = start_ix + removed_count;
600
601            new_messages.append(messages, &());
602
603            let mut ranges = Vec::<Range<usize>>::new();
604            if new_messages.last().unwrap().is_pending() {
605                new_messages.append(old_cursor.suffix(), &());
606            } else {
607                new_messages.append(
608                    old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left),
609                    &(),
610                );
611
612                while let Some(message) = old_cursor.item() {
613                    let message_ix = old_cursor.start().1.0;
614                    if nonces.contains(&message.nonce) {
615                        if ranges.last().is_some_and(|r| r.end == message_ix) {
616                            ranges.last_mut().unwrap().end += 1;
617                        } else {
618                            ranges.push(message_ix..message_ix + 1);
619                        }
620                    } else {
621                        new_messages.push(message.clone(), &());
622                    }
623                    old_cursor.next();
624                }
625            }
626
627            drop(old_cursor);
628            self.messages = new_messages;
629
630            for range in ranges.into_iter().rev() {
631                cx.emit(ChannelChatEvent::MessagesUpdated {
632                    old_range: range,
633                    new_count: 0,
634                });
635            }
636            cx.emit(ChannelChatEvent::MessagesUpdated {
637                old_range: start_ix..end_ix,
638                new_count,
639            });
640
641            cx.notify();
642        }
643    }
644
645    fn message_removed(&mut self, id: u64, cx: &mut Context<Self>) {
646        let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
647        let mut messages = cursor.slice(&ChannelMessageId::Saved(id), Bias::Left);
648        if let Some(item) = cursor.item()
649            && item.id == ChannelMessageId::Saved(id)
650        {
651            let deleted_message_ix = messages.summary().count;
652            cursor.next();
653            messages.append(cursor.suffix(), &());
654            drop(cursor);
655            self.messages = messages;
656
657            // If the message that was deleted was the last acknowledged message,
658            // replace the acknowledged message with an earlier one.
659            self.channel_store.update(cx, |store, _| {
660                let summary = self.messages.summary();
661                if summary.count == 0 {
662                    store.set_acknowledged_message_id(self.channel_id, None);
663                } else if deleted_message_ix == summary.count
664                    && let ChannelMessageId::Saved(id) = summary.max_id
665                {
666                    store.set_acknowledged_message_id(self.channel_id, Some(id));
667                }
668            });
669
670            cx.emit(ChannelChatEvent::MessagesUpdated {
671                old_range: deleted_message_ix..deleted_message_ix + 1,
672                new_count: 0,
673            });
674        }
675    }
676
677    fn message_update(
678        &mut self,
679        id: ChannelMessageId,
680        body: String,
681        mentions: Vec<(Range<usize>, u64)>,
682        edited_at: Option<OffsetDateTime>,
683        cx: &mut Context<Self>,
684    ) {
685        let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
686        let mut messages = cursor.slice(&id, Bias::Left);
687        let ix = messages.summary().count;
688
689        if let Some(mut message_to_update) = cursor.item().cloned() {
690            message_to_update.body = body;
691            message_to_update.mentions = mentions;
692            message_to_update.edited_at = edited_at;
693            messages.push(message_to_update, &());
694            cursor.next();
695        }
696
697        messages.append(cursor.suffix(), &());
698        drop(cursor);
699        self.messages = messages;
700
701        cx.emit(ChannelChatEvent::UpdateMessage {
702            message_ix: ix,
703            message_id: id,
704        });
705
706        cx.notify();
707    }
708}
709
710async fn messages_from_proto(
711    proto_messages: Vec<proto::ChannelMessage>,
712    user_store: &Entity<UserStore>,
713    cx: &mut AsyncApp,
714) -> Result<SumTree<ChannelMessage>> {
715    let messages = ChannelMessage::from_proto_vec(proto_messages, user_store, cx).await?;
716    let mut result = SumTree::default();
717    result.extend(messages, &());
718    Ok(result)
719}
720
721impl ChannelMessage {
722    pub async fn from_proto(
723        message: proto::ChannelMessage,
724        user_store: &Entity<UserStore>,
725        cx: &mut AsyncApp,
726    ) -> Result<Self> {
727        let sender = user_store
728            .update(cx, |user_store, cx| {
729                user_store.get_user(message.sender_id, cx)
730            })?
731            .await?;
732
733        let edited_at = message.edited_at.and_then(|t| -> Option<OffsetDateTime> {
734            if let Ok(a) = OffsetDateTime::from_unix_timestamp(t as i64) {
735                return Some(a);
736            }
737
738            None
739        });
740
741        Ok(ChannelMessage {
742            id: ChannelMessageId::Saved(message.id),
743            body: message.body,
744            mentions: message
745                .mentions
746                .into_iter()
747                .filter_map(|mention| {
748                    let range = mention.range?;
749                    Some((range.start as usize..range.end as usize, mention.user_id))
750                })
751                .collect(),
752            timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?,
753            sender,
754            nonce: message.nonce.context("nonce is required")?.into(),
755            reply_to_message_id: message.reply_to_message_id,
756            edited_at,
757        })
758    }
759
760    pub fn is_pending(&self) -> bool {
761        matches!(self.id, ChannelMessageId::Pending(_))
762    }
763
764    pub async fn from_proto_vec(
765        proto_messages: Vec<proto::ChannelMessage>,
766        user_store: &Entity<UserStore>,
767        cx: &mut AsyncApp,
768    ) -> Result<Vec<Self>> {
769        let unique_user_ids = proto_messages
770            .iter()
771            .map(|m| m.sender_id)
772            .collect::<HashSet<_>>()
773            .into_iter()
774            .collect();
775        user_store
776            .update(cx, |user_store, cx| {
777                user_store.get_users(unique_user_ids, cx)
778            })?
779            .await?;
780
781        let mut messages = Vec::with_capacity(proto_messages.len());
782        for message in proto_messages {
783            messages.push(ChannelMessage::from_proto(message, user_store, cx).await?);
784        }
785        Ok(messages)
786    }
787}
788
789pub fn mentions_to_proto(mentions: &[(Range<usize>, UserId)]) -> Vec<proto::ChatMention> {
790    mentions
791        .iter()
792        .map(|(range, user_id)| proto::ChatMention {
793            range: Some(proto::Range {
794                start: range.start as u64,
795                end: range.end as u64,
796            }),
797            user_id: *user_id,
798        })
799        .collect()
800}
801
802impl sum_tree::Item for ChannelMessage {
803    type Summary = ChannelMessageSummary;
804
805    fn summary(&self, _cx: &()) -> Self::Summary {
806        ChannelMessageSummary {
807            max_id: self.id,
808            count: 1,
809        }
810    }
811}
812
813impl Default for ChannelMessageId {
814    fn default() -> Self {
815        Self::Saved(0)
816    }
817}
818
819impl sum_tree::Summary for ChannelMessageSummary {
820    type Context = ();
821
822    fn zero(_cx: &Self::Context) -> Self {
823        Default::default()
824    }
825
826    fn add_summary(&mut self, summary: &Self, _: &()) {
827        self.max_id = summary.max_id;
828        self.count += summary.count;
829    }
830}
831
832impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId {
833    fn zero(_cx: &()) -> Self {
834        Default::default()
835    }
836
837    fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
838        debug_assert!(summary.max_id > *self);
839        *self = summary.max_id;
840    }
841}
842
843impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count {
844    fn zero(_cx: &()) -> Self {
845        Default::default()
846    }
847
848    fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
849        self.0 += summary.count;
850    }
851}
852
853impl<'a> From<&'a str> for MessageParams {
854    fn from(value: &'a str) -> Self {
855        Self {
856            text: value.into(),
857            mentions: Vec::new(),
858            reply_to_message_id: None,
859        }
860    }
861}