channel_chat.rs

  1use crate::{Channel, ChannelStore};
  2use anyhow::{anyhow, Result};
  3use client::{
  4    proto,
  5    user::{User, UserStore},
  6    ChannelId, Client, Subscription, TypedEnvelope, UserId,
  7};
  8use collections::HashSet;
  9use futures::lock::Mutex;
 10use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task, WeakEntity};
 11use rand::prelude::*;
 12use rpc::AnyProtoClient;
 13use std::{
 14    ops::{ControlFlow, Range},
 15    sync::Arc,
 16};
 17use sum_tree::{Bias, SumTree};
 18use time::OffsetDateTime;
 19use util::{post_inc, ResultExt as _, TryFutureExt};
 20
 21pub struct ChannelChat {
 22    pub channel_id: ChannelId,
 23    messages: SumTree<ChannelMessage>,
 24    acknowledged_message_ids: HashSet<u64>,
 25    channel_store: Entity<ChannelStore>,
 26    loaded_all_messages: bool,
 27    last_acknowledged_id: Option<u64>,
 28    next_pending_message_id: usize,
 29    first_loaded_message_id: Option<u64>,
 30    user_store: Entity<UserStore>,
 31    rpc: Arc<Client>,
 32    outgoing_messages_lock: Arc<Mutex<()>>,
 33    rng: StdRng,
 34    _subscription: Subscription,
 35}
 36
 37#[derive(Debug, PartialEq, Eq)]
 38pub struct MessageParams {
 39    pub text: String,
 40    pub mentions: Vec<(Range<usize>, UserId)>,
 41    pub reply_to_message_id: Option<u64>,
 42}
 43
 44#[derive(Clone, Debug)]
 45pub struct ChannelMessage {
 46    pub id: ChannelMessageId,
 47    pub body: String,
 48    pub timestamp: OffsetDateTime,
 49    pub sender: Arc<User>,
 50    pub nonce: u128,
 51    pub mentions: Vec<(Range<usize>, UserId)>,
 52    pub reply_to_message_id: Option<u64>,
 53    pub edited_at: Option<OffsetDateTime>,
 54}
 55
 56#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
 57pub enum ChannelMessageId {
 58    Saved(u64),
 59    Pending(usize),
 60}
 61
 62impl From<ChannelMessageId> for Option<u64> {
 63    fn from(val: ChannelMessageId) -> Self {
 64        match val {
 65            ChannelMessageId::Saved(id) => Some(id),
 66            ChannelMessageId::Pending(_) => None,
 67        }
 68    }
 69}
 70
 71#[derive(Clone, Debug, Default)]
 72pub struct ChannelMessageSummary {
 73    max_id: ChannelMessageId,
 74    count: usize,
 75}
 76
 77#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
 78struct Count(usize);
 79
 80#[derive(Clone, Debug, PartialEq)]
 81pub enum ChannelChatEvent {
 82    MessagesUpdated {
 83        old_range: Range<usize>,
 84        new_count: usize,
 85    },
 86    UpdateMessage {
 87        message_id: ChannelMessageId,
 88        message_ix: usize,
 89    },
 90    NewMessage {
 91        channel_id: ChannelId,
 92        message_id: u64,
 93    },
 94}
 95
 96impl EventEmitter<ChannelChatEvent> for ChannelChat {}
 97pub fn init(client: &AnyProtoClient) {
 98    client.add_entity_message_handler(ChannelChat::handle_message_sent);
 99    client.add_entity_message_handler(ChannelChat::handle_message_removed);
100    client.add_entity_message_handler(ChannelChat::handle_message_updated);
101}
102
103impl ChannelChat {
104    pub async fn new(
105        channel: Arc<Channel>,
106        channel_store: Entity<ChannelStore>,
107        user_store: Entity<UserStore>,
108        client: Arc<Client>,
109        mut cx: AsyncApp,
110    ) -> Result<Entity<Self>> {
111        let channel_id = channel.id;
112        let subscription = client.subscribe_to_entity(channel_id.0).unwrap();
113
114        let response = client
115            .request(proto::JoinChannelChat {
116                channel_id: channel_id.0,
117            })
118            .await?;
119
120        let handle = cx.new(|cx| {
121            cx.on_release(Self::release).detach();
122            Self {
123                channel_id: channel.id,
124                user_store: user_store.clone(),
125                channel_store,
126                rpc: client.clone(),
127                outgoing_messages_lock: Default::default(),
128                messages: Default::default(),
129                acknowledged_message_ids: Default::default(),
130                loaded_all_messages: false,
131                next_pending_message_id: 0,
132                last_acknowledged_id: None,
133                rng: StdRng::from_entropy(),
134                first_loaded_message_id: None,
135                _subscription: subscription.set_entity(&cx.entity(), &mut cx.to_async()),
136            }
137        })?;
138        Self::handle_loaded_messages(
139            handle.downgrade(),
140            user_store,
141            client,
142            response.messages,
143            response.done,
144            &mut cx,
145        )
146        .await?;
147        Ok(handle)
148    }
149
150    fn release(&mut self, _: &mut App) {
151        self.rpc
152            .send(proto::LeaveChannelChat {
153                channel_id: self.channel_id.0,
154            })
155            .log_err();
156    }
157
158    pub fn channel(&self, cx: &App) -> Option<Arc<Channel>> {
159        self.channel_store
160            .read(cx)
161            .channel_for_id(self.channel_id)
162            .cloned()
163    }
164
165    pub fn client(&self) -> &Arc<Client> {
166        &self.rpc
167    }
168
169    pub fn send_message(
170        &mut self,
171        message: MessageParams,
172        cx: &mut Context<Self>,
173    ) -> Result<Task<Result<u64>>> {
174        if message.text.trim().is_empty() {
175            Err(anyhow!("message body can't be empty"))?;
176        }
177
178        let current_user = self
179            .user_store
180            .read(cx)
181            .current_user()
182            .ok_or_else(|| anyhow!("current_user is not present"))?;
183
184        let channel_id = self.channel_id;
185        let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id));
186        let nonce = self.rng.gen();
187        self.insert_messages(
188            SumTree::from_item(
189                ChannelMessage {
190                    id: pending_id,
191                    body: message.text.clone(),
192                    sender: current_user,
193                    timestamp: OffsetDateTime::now_utc(),
194                    mentions: message.mentions.clone(),
195                    nonce,
196                    reply_to_message_id: message.reply_to_message_id,
197                    edited_at: None,
198                },
199                &(),
200            ),
201            cx,
202        );
203        let user_store = self.user_store.clone();
204        let rpc = self.rpc.clone();
205        let outgoing_messages_lock = self.outgoing_messages_lock.clone();
206
207        // todo - handle messages that fail to send (e.g. >1024 chars)
208        Ok(cx.spawn(move |this, mut cx| async move {
209            let outgoing_message_guard = outgoing_messages_lock.lock().await;
210            let request = rpc.request(proto::SendChannelMessage {
211                channel_id: channel_id.0,
212                body: message.text,
213                nonce: Some(nonce.into()),
214                mentions: mentions_to_proto(&message.mentions),
215                reply_to_message_id: message.reply_to_message_id,
216            });
217            let response = request.await?;
218            drop(outgoing_message_guard);
219            let response = response.message.ok_or_else(|| anyhow!("invalid message"))?;
220            let id = response.id;
221            let message = ChannelMessage::from_proto(response, &user_store, &mut cx).await?;
222            this.update(&mut cx, |this, cx| {
223                this.insert_messages(SumTree::from_item(message, &()), cx);
224                if this.first_loaded_message_id.is_none() {
225                    this.first_loaded_message_id = Some(id);
226                }
227            })?;
228            Ok(id)
229        }))
230    }
231
232    pub fn remove_message(&mut self, id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
233        let response = self.rpc.request(proto::RemoveChannelMessage {
234            channel_id: self.channel_id.0,
235            message_id: id,
236        });
237        cx.spawn(move |this, mut cx| async move {
238            response.await?;
239            this.update(&mut cx, |this, cx| {
240                this.message_removed(id, cx);
241            })?;
242            Ok(())
243        })
244    }
245
246    pub fn update_message(
247        &mut self,
248        id: u64,
249        message: MessageParams,
250        cx: &mut Context<Self>,
251    ) -> Result<Task<Result<()>>> {
252        self.message_update(
253            ChannelMessageId::Saved(id),
254            message.text.clone(),
255            message.mentions.clone(),
256            Some(OffsetDateTime::now_utc()),
257            cx,
258        );
259
260        let nonce: u128 = self.rng.gen();
261
262        let request = self.rpc.request(proto::UpdateChannelMessage {
263            channel_id: self.channel_id.0,
264            message_id: id,
265            body: message.text,
266            nonce: Some(nonce.into()),
267            mentions: mentions_to_proto(&message.mentions),
268        });
269        Ok(cx.spawn(move |_, _| async move {
270            request.await?;
271            Ok(())
272        }))
273    }
274
275    pub fn load_more_messages(&mut self, cx: &mut Context<Self>) -> Option<Task<Option<()>>> {
276        if self.loaded_all_messages {
277            return None;
278        }
279
280        let rpc = self.rpc.clone();
281        let user_store = self.user_store.clone();
282        let channel_id = self.channel_id;
283        let before_message_id = self.first_loaded_message_id()?;
284        Some(cx.spawn(move |this, mut cx| {
285            async move {
286                let response = rpc
287                    .request(proto::GetChannelMessages {
288                        channel_id: channel_id.0,
289                        before_message_id,
290                    })
291                    .await?;
292                Self::handle_loaded_messages(
293                    this,
294                    user_store,
295                    rpc,
296                    response.messages,
297                    response.done,
298                    &mut cx,
299                )
300                .await?;
301
302                anyhow::Ok(())
303            }
304            .log_err()
305        }))
306    }
307
308    pub fn first_loaded_message_id(&mut self) -> Option<u64> {
309        self.first_loaded_message_id
310    }
311
312    /// Load a message by its id, if it's already stored locally.
313    pub fn find_loaded_message(&self, id: u64) -> Option<&ChannelMessage> {
314        self.messages.iter().find(|message| match message.id {
315            ChannelMessageId::Saved(message_id) => message_id == id,
316            ChannelMessageId::Pending(_) => false,
317        })
318    }
319
320    /// Load all of the chat messages since a certain message id.
321    ///
322    /// For now, we always maintain a suffix of the channel's messages.
323    pub async fn load_history_since_message(
324        chat: Entity<Self>,
325        message_id: u64,
326        mut cx: AsyncApp,
327    ) -> Option<usize> {
328        loop {
329            let step = chat
330                .update(&mut cx, |chat, cx| {
331                    if let Some(first_id) = chat.first_loaded_message_id() {
332                        if first_id <= message_id {
333                            let mut cursor = chat.messages.cursor::<(ChannelMessageId, Count)>(&());
334                            let message_id = ChannelMessageId::Saved(message_id);
335                            cursor.seek(&message_id, Bias::Left, &());
336                            return ControlFlow::Break(
337                                if cursor
338                                    .item()
339                                    .map_or(false, |message| message.id == message_id)
340                                {
341                                    Some(cursor.start().1 .0)
342                                } else {
343                                    None
344                                },
345                            );
346                        }
347                    }
348                    ControlFlow::Continue(chat.load_more_messages(cx))
349                })
350                .log_err()?;
351            match step {
352                ControlFlow::Break(ix) => return ix,
353                ControlFlow::Continue(task) => task?.await?,
354            }
355        }
356    }
357
358    pub fn acknowledge_last_message(&mut self, cx: &mut Context<Self>) {
359        if let ChannelMessageId::Saved(latest_message_id) = self.messages.summary().max_id {
360            if self
361                .last_acknowledged_id
362                .map_or(true, |acknowledged_id| acknowledged_id < latest_message_id)
363            {
364                self.rpc
365                    .send(proto::AckChannelMessage {
366                        channel_id: self.channel_id.0,
367                        message_id: latest_message_id,
368                    })
369                    .ok();
370                self.last_acknowledged_id = Some(latest_message_id);
371                self.channel_store.update(cx, |store, cx| {
372                    store.acknowledge_message_id(self.channel_id, latest_message_id, cx);
373                });
374            }
375        }
376    }
377
378    async fn handle_loaded_messages(
379        this: WeakEntity<Self>,
380        user_store: Entity<UserStore>,
381        rpc: Arc<Client>,
382        proto_messages: Vec<proto::ChannelMessage>,
383        loaded_all_messages: bool,
384        cx: &mut AsyncApp,
385    ) -> Result<()> {
386        let loaded_messages = messages_from_proto(proto_messages, &user_store, cx).await?;
387
388        let first_loaded_message_id = loaded_messages.first().map(|m| m.id);
389        let loaded_message_ids = this.update(cx, |this, _| {
390            let mut loaded_message_ids: HashSet<u64> = HashSet::default();
391            for message in loaded_messages.iter() {
392                if let Some(saved_message_id) = message.id.into() {
393                    loaded_message_ids.insert(saved_message_id);
394                }
395            }
396            for message in this.messages.iter() {
397                if let Some(saved_message_id) = message.id.into() {
398                    loaded_message_ids.insert(saved_message_id);
399                }
400            }
401            loaded_message_ids
402        })?;
403
404        let missing_ancestors = loaded_messages
405            .iter()
406            .filter_map(|message| {
407                if let Some(ancestor_id) = message.reply_to_message_id {
408                    if !loaded_message_ids.contains(&ancestor_id) {
409                        return Some(ancestor_id);
410                    }
411                }
412                None
413            })
414            .collect::<Vec<_>>();
415
416        let loaded_ancestors = if missing_ancestors.is_empty() {
417            None
418        } else {
419            let response = rpc
420                .request(proto::GetChannelMessagesById {
421                    message_ids: missing_ancestors,
422                })
423                .await?;
424            Some(messages_from_proto(response.messages, &user_store, cx).await?)
425        };
426        this.update(cx, |this, cx| {
427            this.first_loaded_message_id = first_loaded_message_id.and_then(|msg_id| msg_id.into());
428            this.loaded_all_messages = loaded_all_messages;
429            this.insert_messages(loaded_messages, cx);
430            if let Some(loaded_ancestors) = loaded_ancestors {
431                this.insert_messages(loaded_ancestors, cx);
432            }
433        })?;
434
435        Ok(())
436    }
437
438    pub fn rejoin(&mut self, cx: &mut Context<Self>) {
439        let user_store = self.user_store.clone();
440        let rpc = self.rpc.clone();
441        let channel_id = self.channel_id;
442        cx.spawn(move |this, mut cx| {
443            async move {
444                let response = rpc
445                    .request(proto::JoinChannelChat {
446                        channel_id: channel_id.0,
447                    })
448                    .await?;
449                Self::handle_loaded_messages(
450                    this.clone(),
451                    user_store.clone(),
452                    rpc.clone(),
453                    response.messages,
454                    response.done,
455                    &mut cx,
456                )
457                .await?;
458
459                let pending_messages = this.update(&mut cx, |this, _| {
460                    this.pending_messages().cloned().collect::<Vec<_>>()
461                })?;
462
463                for pending_message in pending_messages {
464                    let request = rpc.request(proto::SendChannelMessage {
465                        channel_id: channel_id.0,
466                        body: pending_message.body,
467                        mentions: mentions_to_proto(&pending_message.mentions),
468                        nonce: Some(pending_message.nonce.into()),
469                        reply_to_message_id: pending_message.reply_to_message_id,
470                    });
471                    let response = request.await?;
472                    let message = ChannelMessage::from_proto(
473                        response.message.ok_or_else(|| anyhow!("invalid message"))?,
474                        &user_store,
475                        &mut cx,
476                    )
477                    .await?;
478                    this.update(&mut cx, |this, cx| {
479                        this.insert_messages(SumTree::from_item(message, &()), cx);
480                    })?;
481                }
482
483                anyhow::Ok(())
484            }
485            .log_err()
486        })
487        .detach();
488    }
489
490    pub fn message_count(&self) -> usize {
491        self.messages.summary().count
492    }
493
494    pub fn messages(&self) -> &SumTree<ChannelMessage> {
495        &self.messages
496    }
497
498    pub fn message(&self, ix: usize) -> &ChannelMessage {
499        let mut cursor = self.messages.cursor::<Count>(&());
500        cursor.seek(&Count(ix), Bias::Right, &());
501        cursor.item().unwrap()
502    }
503
504    pub fn acknowledge_message(&mut self, id: u64) {
505        if self.acknowledged_message_ids.insert(id) {
506            self.rpc
507                .send(proto::AckChannelMessage {
508                    channel_id: self.channel_id.0,
509                    message_id: id,
510                })
511                .ok();
512        }
513    }
514
515    pub fn messages_in_range(&self, range: Range<usize>) -> impl Iterator<Item = &ChannelMessage> {
516        let mut cursor = self.messages.cursor::<Count>(&());
517        cursor.seek(&Count(range.start), Bias::Right, &());
518        cursor.take(range.len())
519    }
520
521    pub fn pending_messages(&self) -> impl Iterator<Item = &ChannelMessage> {
522        let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
523        cursor.seek(&ChannelMessageId::Pending(0), Bias::Left, &());
524        cursor
525    }
526
527    async fn handle_message_sent(
528        this: Entity<Self>,
529        message: TypedEnvelope<proto::ChannelMessageSent>,
530        mut cx: AsyncApp,
531    ) -> Result<()> {
532        let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
533        let message = message
534            .payload
535            .message
536            .ok_or_else(|| anyhow!("empty message"))?;
537        let message_id = message.id;
538
539        let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
540        this.update(&mut cx, |this, cx| {
541            this.insert_messages(SumTree::from_item(message, &()), cx);
542            cx.emit(ChannelChatEvent::NewMessage {
543                channel_id: this.channel_id,
544                message_id,
545            })
546        })?;
547
548        Ok(())
549    }
550
551    async fn handle_message_removed(
552        this: Entity<Self>,
553        message: TypedEnvelope<proto::RemoveChannelMessage>,
554        mut cx: AsyncApp,
555    ) -> Result<()> {
556        this.update(&mut cx, |this, cx| {
557            this.message_removed(message.payload.message_id, cx)
558        })?;
559        Ok(())
560    }
561
562    async fn handle_message_updated(
563        this: Entity<Self>,
564        message: TypedEnvelope<proto::ChannelMessageUpdate>,
565        mut cx: AsyncApp,
566    ) -> Result<()> {
567        let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
568        let message = message
569            .payload
570            .message
571            .ok_or_else(|| anyhow!("empty message"))?;
572
573        let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
574
575        this.update(&mut cx, |this, cx| {
576            this.message_update(
577                message.id,
578                message.body,
579                message.mentions,
580                message.edited_at,
581                cx,
582            )
583        })?;
584        Ok(())
585    }
586
587    fn insert_messages(&mut self, messages: SumTree<ChannelMessage>, cx: &mut Context<Self>) {
588        if let Some((first_message, last_message)) = messages.first().zip(messages.last()) {
589            let nonces = messages
590                .cursor::<()>(&())
591                .map(|m| m.nonce)
592                .collect::<HashSet<_>>();
593
594            let mut old_cursor = self.messages.cursor::<(ChannelMessageId, Count)>(&());
595            let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left, &());
596            let start_ix = old_cursor.start().1 .0;
597            let removed_messages = old_cursor.slice(&last_message.id, Bias::Right, &());
598            let removed_count = removed_messages.summary().count;
599            let new_count = messages.summary().count;
600            let end_ix = start_ix + removed_count;
601
602            new_messages.append(messages, &());
603
604            let mut ranges = Vec::<Range<usize>>::new();
605            if new_messages.last().unwrap().is_pending() {
606                new_messages.append(old_cursor.suffix(&()), &());
607            } else {
608                new_messages.append(
609                    old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left, &()),
610                    &(),
611                );
612
613                while let Some(message) = old_cursor.item() {
614                    let message_ix = old_cursor.start().1 .0;
615                    if nonces.contains(&message.nonce) {
616                        if ranges.last().map_or(false, |r| r.end == message_ix) {
617                            ranges.last_mut().unwrap().end += 1;
618                        } else {
619                            ranges.push(message_ix..message_ix + 1);
620                        }
621                    } else {
622                        new_messages.push(message.clone(), &());
623                    }
624                    old_cursor.next(&());
625                }
626            }
627
628            drop(old_cursor);
629            self.messages = new_messages;
630
631            for range in ranges.into_iter().rev() {
632                cx.emit(ChannelChatEvent::MessagesUpdated {
633                    old_range: range,
634                    new_count: 0,
635                });
636            }
637            cx.emit(ChannelChatEvent::MessagesUpdated {
638                old_range: start_ix..end_ix,
639                new_count,
640            });
641
642            cx.notify();
643        }
644    }
645
646    fn message_removed(&mut self, id: u64, cx: &mut Context<Self>) {
647        let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
648        let mut messages = cursor.slice(&ChannelMessageId::Saved(id), Bias::Left, &());
649        if let Some(item) = cursor.item() {
650            if item.id == ChannelMessageId::Saved(id) {
651                let deleted_message_ix = messages.summary().count;
652                cursor.next(&());
653                messages.append(cursor.suffix(&()), &());
654                drop(cursor);
655                self.messages = messages;
656
657                // If the message that was deleted was the last acknowledged message,
658                // replace the acknowledged message with an earlier one.
659                self.channel_store.update(cx, |store, _| {
660                    let summary = self.messages.summary();
661                    if summary.count == 0 {
662                        store.set_acknowledged_message_id(self.channel_id, None);
663                    } else if deleted_message_ix == summary.count {
664                        if let ChannelMessageId::Saved(id) = summary.max_id {
665                            store.set_acknowledged_message_id(self.channel_id, Some(id));
666                        }
667                    }
668                });
669
670                cx.emit(ChannelChatEvent::MessagesUpdated {
671                    old_range: deleted_message_ix..deleted_message_ix + 1,
672                    new_count: 0,
673                });
674            }
675        }
676    }
677
678    fn message_update(
679        &mut self,
680        id: ChannelMessageId,
681        body: String,
682        mentions: Vec<(Range<usize>, u64)>,
683        edited_at: Option<OffsetDateTime>,
684        cx: &mut Context<Self>,
685    ) {
686        let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
687        let mut messages = cursor.slice(&id, Bias::Left, &());
688        let ix = messages.summary().count;
689
690        if let Some(mut message_to_update) = cursor.item().cloned() {
691            message_to_update.body = body;
692            message_to_update.mentions = mentions;
693            message_to_update.edited_at = edited_at;
694            messages.push(message_to_update, &());
695            cursor.next(&());
696        }
697
698        messages.append(cursor.suffix(&()), &());
699        drop(cursor);
700        self.messages = messages;
701
702        cx.emit(ChannelChatEvent::UpdateMessage {
703            message_ix: ix,
704            message_id: id,
705        });
706
707        cx.notify();
708    }
709}
710
711async fn messages_from_proto(
712    proto_messages: Vec<proto::ChannelMessage>,
713    user_store: &Entity<UserStore>,
714    cx: &mut AsyncApp,
715) -> Result<SumTree<ChannelMessage>> {
716    let messages = ChannelMessage::from_proto_vec(proto_messages, user_store, cx).await?;
717    let mut result = SumTree::default();
718    result.extend(messages, &());
719    Ok(result)
720}
721
722impl ChannelMessage {
723    pub async fn from_proto(
724        message: proto::ChannelMessage,
725        user_store: &Entity<UserStore>,
726        cx: &mut AsyncApp,
727    ) -> Result<Self> {
728        let sender = user_store
729            .update(cx, |user_store, cx| {
730                user_store.get_user(message.sender_id, cx)
731            })?
732            .await?;
733
734        let edited_at = message.edited_at.and_then(|t| -> Option<OffsetDateTime> {
735            if let Ok(a) = OffsetDateTime::from_unix_timestamp(t as i64) {
736                return Some(a);
737            }
738
739            None
740        });
741
742        Ok(ChannelMessage {
743            id: ChannelMessageId::Saved(message.id),
744            body: message.body,
745            mentions: message
746                .mentions
747                .into_iter()
748                .filter_map(|mention| {
749                    let range = mention.range?;
750                    Some((range.start as usize..range.end as usize, mention.user_id))
751                })
752                .collect(),
753            timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?,
754            sender,
755            nonce: message
756                .nonce
757                .ok_or_else(|| anyhow!("nonce is required"))?
758                .into(),
759            reply_to_message_id: message.reply_to_message_id,
760            edited_at,
761        })
762    }
763
764    pub fn is_pending(&self) -> bool {
765        matches!(self.id, ChannelMessageId::Pending(_))
766    }
767
768    pub async fn from_proto_vec(
769        proto_messages: Vec<proto::ChannelMessage>,
770        user_store: &Entity<UserStore>,
771        cx: &mut AsyncApp,
772    ) -> Result<Vec<Self>> {
773        let unique_user_ids = proto_messages
774            .iter()
775            .map(|m| m.sender_id)
776            .collect::<HashSet<_>>()
777            .into_iter()
778            .collect();
779        user_store
780            .update(cx, |user_store, cx| {
781                user_store.get_users(unique_user_ids, cx)
782            })?
783            .await?;
784
785        let mut messages = Vec::with_capacity(proto_messages.len());
786        for message in proto_messages {
787            messages.push(ChannelMessage::from_proto(message, user_store, cx).await?);
788        }
789        Ok(messages)
790    }
791}
792
793pub fn mentions_to_proto(mentions: &[(Range<usize>, UserId)]) -> Vec<proto::ChatMention> {
794    mentions
795        .iter()
796        .map(|(range, user_id)| proto::ChatMention {
797            range: Some(proto::Range {
798                start: range.start as u64,
799                end: range.end as u64,
800            }),
801            user_id: *user_id,
802        })
803        .collect()
804}
805
806impl sum_tree::Item for ChannelMessage {
807    type Summary = ChannelMessageSummary;
808
809    fn summary(&self, _cx: &()) -> Self::Summary {
810        ChannelMessageSummary {
811            max_id: self.id,
812            count: 1,
813        }
814    }
815}
816
817impl Default for ChannelMessageId {
818    fn default() -> Self {
819        Self::Saved(0)
820    }
821}
822
823impl sum_tree::Summary for ChannelMessageSummary {
824    type Context = ();
825
826    fn zero(_cx: &Self::Context) -> Self {
827        Default::default()
828    }
829
830    fn add_summary(&mut self, summary: &Self, _: &()) {
831        self.max_id = summary.max_id;
832        self.count += summary.count;
833    }
834}
835
836impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId {
837    fn zero(_cx: &()) -> Self {
838        Default::default()
839    }
840
841    fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
842        debug_assert!(summary.max_id > *self);
843        *self = summary.max_id;
844    }
845}
846
847impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count {
848    fn zero(_cx: &()) -> Self {
849        Default::default()
850    }
851
852    fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
853        self.0 += summary.count;
854    }
855}
856
857impl<'a> From<&'a str> for MessageParams {
858    fn from(value: &'a str) -> Self {
859        Self {
860            text: value.into(),
861            mentions: Vec::new(),
862            reply_to_message_id: None,
863        }
864    }
865}