1use crate::{Channel, ChannelStore};
2use anyhow::{anyhow, Result};
3use client::{
4 proto,
5 user::{User, UserStore},
6 ChannelId, Client, Subscription, TypedEnvelope, UserId,
7};
8use collections::HashSet;
9use futures::lock::Mutex;
10use gpui::{
11 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
12};
13use rand::prelude::*;
14use rpc::AnyProtoClient;
15use std::{
16 ops::{ControlFlow, Range},
17 sync::Arc,
18};
19use sum_tree::{Bias, SumTree};
20use time::OffsetDateTime;
21use util::{post_inc, ResultExt as _, TryFutureExt};
22
23pub struct ChannelChat {
24 pub channel_id: ChannelId,
25 messages: SumTree<ChannelMessage>,
26 acknowledged_message_ids: HashSet<u64>,
27 channel_store: Model<ChannelStore>,
28 loaded_all_messages: bool,
29 last_acknowledged_id: Option<u64>,
30 next_pending_message_id: usize,
31 first_loaded_message_id: Option<u64>,
32 user_store: Model<UserStore>,
33 rpc: Arc<Client>,
34 outgoing_messages_lock: Arc<Mutex<()>>,
35 rng: StdRng,
36 _subscription: Subscription,
37}
38
39#[derive(Debug, PartialEq, Eq)]
40pub struct MessageParams {
41 pub text: String,
42 pub mentions: Vec<(Range<usize>, UserId)>,
43 pub reply_to_message_id: Option<u64>,
44}
45
46#[derive(Clone, Debug)]
47pub struct ChannelMessage {
48 pub id: ChannelMessageId,
49 pub body: String,
50 pub timestamp: OffsetDateTime,
51 pub sender: Arc<User>,
52 pub nonce: u128,
53 pub mentions: Vec<(Range<usize>, UserId)>,
54 pub reply_to_message_id: Option<u64>,
55 pub edited_at: Option<OffsetDateTime>,
56}
57
58#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
59pub enum ChannelMessageId {
60 Saved(u64),
61 Pending(usize),
62}
63
64impl From<ChannelMessageId> for Option<u64> {
65 fn from(val: ChannelMessageId) -> Self {
66 match val {
67 ChannelMessageId::Saved(id) => Some(id),
68 ChannelMessageId::Pending(_) => None,
69 }
70 }
71}
72
73#[derive(Clone, Debug, Default)]
74pub struct ChannelMessageSummary {
75 max_id: ChannelMessageId,
76 count: usize,
77}
78
79#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
80struct Count(usize);
81
82#[derive(Clone, Debug, PartialEq)]
83pub enum ChannelChatEvent {
84 MessagesUpdated {
85 old_range: Range<usize>,
86 new_count: usize,
87 },
88 UpdateMessage {
89 message_id: ChannelMessageId,
90 message_ix: usize,
91 },
92 NewMessage {
93 channel_id: ChannelId,
94 message_id: u64,
95 },
96}
97
98impl EventEmitter<ChannelChatEvent> for ChannelChat {}
99pub fn init(client: &AnyProtoClient) {
100 client.add_model_message_handler(ChannelChat::handle_message_sent);
101 client.add_model_message_handler(ChannelChat::handle_message_removed);
102 client.add_model_message_handler(ChannelChat::handle_message_updated);
103}
104
105impl ChannelChat {
106 pub async fn new(
107 channel: Arc<Channel>,
108 channel_store: Model<ChannelStore>,
109 user_store: Model<UserStore>,
110 client: Arc<Client>,
111 mut cx: AsyncAppContext,
112 ) -> Result<Model<Self>> {
113 let channel_id = channel.id;
114 let subscription = client.subscribe_to_entity(channel_id.0).unwrap();
115
116 let response = client
117 .request(proto::JoinChannelChat {
118 channel_id: channel_id.0,
119 })
120 .await?;
121
122 let handle = cx.new_model(|cx| {
123 cx.on_release(Self::release).detach();
124 Self {
125 channel_id: channel.id,
126 user_store: user_store.clone(),
127 channel_store,
128 rpc: client.clone(),
129 outgoing_messages_lock: Default::default(),
130 messages: Default::default(),
131 acknowledged_message_ids: Default::default(),
132 loaded_all_messages: false,
133 next_pending_message_id: 0,
134 last_acknowledged_id: None,
135 rng: StdRng::from_entropy(),
136 first_loaded_message_id: None,
137 _subscription: subscription.set_model(&cx.handle(), &mut cx.to_async()),
138 }
139 })?;
140 Self::handle_loaded_messages(
141 handle.downgrade(),
142 user_store,
143 client,
144 response.messages,
145 response.done,
146 &mut cx,
147 )
148 .await?;
149 Ok(handle)
150 }
151
152 fn release(&mut self, _: &mut AppContext) {
153 self.rpc
154 .send(proto::LeaveChannelChat {
155 channel_id: self.channel_id.0,
156 })
157 .log_err();
158 }
159
160 pub fn channel(&self, cx: &AppContext) -> Option<Arc<Channel>> {
161 self.channel_store
162 .read(cx)
163 .channel_for_id(self.channel_id)
164 .cloned()
165 }
166
167 pub fn client(&self) -> &Arc<Client> {
168 &self.rpc
169 }
170
171 pub fn send_message(
172 &mut self,
173 message: MessageParams,
174 cx: &mut ModelContext<Self>,
175 ) -> Result<Task<Result<u64>>> {
176 if message.text.trim().is_empty() {
177 Err(anyhow!("message body can't be empty"))?;
178 }
179
180 let current_user = self
181 .user_store
182 .read(cx)
183 .current_user()
184 .ok_or_else(|| anyhow!("current_user is not present"))?;
185
186 let channel_id = self.channel_id;
187 let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id));
188 let nonce = self.rng.gen();
189 self.insert_messages(
190 SumTree::from_item(
191 ChannelMessage {
192 id: pending_id,
193 body: message.text.clone(),
194 sender: current_user,
195 timestamp: OffsetDateTime::now_utc(),
196 mentions: message.mentions.clone(),
197 nonce,
198 reply_to_message_id: message.reply_to_message_id,
199 edited_at: None,
200 },
201 &(),
202 ),
203 cx,
204 );
205 let user_store = self.user_store.clone();
206 let rpc = self.rpc.clone();
207 let outgoing_messages_lock = self.outgoing_messages_lock.clone();
208
209 // todo - handle messages that fail to send (e.g. >1024 chars)
210 Ok(cx.spawn(move |this, mut cx| async move {
211 let outgoing_message_guard = outgoing_messages_lock.lock().await;
212 let request = rpc.request(proto::SendChannelMessage {
213 channel_id: channel_id.0,
214 body: message.text,
215 nonce: Some(nonce.into()),
216 mentions: mentions_to_proto(&message.mentions),
217 reply_to_message_id: message.reply_to_message_id,
218 });
219 let response = request.await?;
220 drop(outgoing_message_guard);
221 let response = response.message.ok_or_else(|| anyhow!("invalid message"))?;
222 let id = response.id;
223 let message = ChannelMessage::from_proto(response, &user_store, &mut cx).await?;
224 this.update(&mut cx, |this, cx| {
225 this.insert_messages(SumTree::from_item(message, &()), cx);
226 if this.first_loaded_message_id.is_none() {
227 this.first_loaded_message_id = Some(id);
228 }
229 })?;
230 Ok(id)
231 }))
232 }
233
234 pub fn remove_message(&mut self, id: u64, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
235 let response = self.rpc.request(proto::RemoveChannelMessage {
236 channel_id: self.channel_id.0,
237 message_id: id,
238 });
239 cx.spawn(move |this, mut cx| async move {
240 response.await?;
241 this.update(&mut cx, |this, cx| {
242 this.message_removed(id, cx);
243 })?;
244 Ok(())
245 })
246 }
247
248 pub fn update_message(
249 &mut self,
250 id: u64,
251 message: MessageParams,
252 cx: &mut ModelContext<Self>,
253 ) -> Result<Task<Result<()>>> {
254 self.message_update(
255 ChannelMessageId::Saved(id),
256 message.text.clone(),
257 message.mentions.clone(),
258 Some(OffsetDateTime::now_utc()),
259 cx,
260 );
261
262 let nonce: u128 = self.rng.gen();
263
264 let request = self.rpc.request(proto::UpdateChannelMessage {
265 channel_id: self.channel_id.0,
266 message_id: id,
267 body: message.text,
268 nonce: Some(nonce.into()),
269 mentions: mentions_to_proto(&message.mentions),
270 });
271 Ok(cx.spawn(move |_, _| async move {
272 request.await?;
273 Ok(())
274 }))
275 }
276
277 pub fn load_more_messages(&mut self, cx: &mut ModelContext<Self>) -> Option<Task<Option<()>>> {
278 if self.loaded_all_messages {
279 return None;
280 }
281
282 let rpc = self.rpc.clone();
283 let user_store = self.user_store.clone();
284 let channel_id = self.channel_id;
285 let before_message_id = self.first_loaded_message_id()?;
286 Some(cx.spawn(move |this, mut cx| {
287 async move {
288 let response = rpc
289 .request(proto::GetChannelMessages {
290 channel_id: channel_id.0,
291 before_message_id,
292 })
293 .await?;
294 Self::handle_loaded_messages(
295 this,
296 user_store,
297 rpc,
298 response.messages,
299 response.done,
300 &mut cx,
301 )
302 .await?;
303
304 anyhow::Ok(())
305 }
306 .log_err()
307 }))
308 }
309
310 pub fn first_loaded_message_id(&mut self) -> Option<u64> {
311 self.first_loaded_message_id
312 }
313
314 /// Load a message by its id, if it's already stored locally.
315 pub fn find_loaded_message(&self, id: u64) -> Option<&ChannelMessage> {
316 self.messages.iter().find(|message| match message.id {
317 ChannelMessageId::Saved(message_id) => message_id == id,
318 ChannelMessageId::Pending(_) => false,
319 })
320 }
321
322 /// Load all of the chat messages since a certain message id.
323 ///
324 /// For now, we always maintain a suffix of the channel's messages.
325 pub async fn load_history_since_message(
326 chat: Model<Self>,
327 message_id: u64,
328 mut cx: AsyncAppContext,
329 ) -> Option<usize> {
330 loop {
331 let step = chat
332 .update(&mut cx, |chat, cx| {
333 if let Some(first_id) = chat.first_loaded_message_id() {
334 if first_id <= message_id {
335 let mut cursor = chat.messages.cursor::<(ChannelMessageId, Count)>(&());
336 let message_id = ChannelMessageId::Saved(message_id);
337 cursor.seek(&message_id, Bias::Left, &());
338 return ControlFlow::Break(
339 if cursor
340 .item()
341 .map_or(false, |message| message.id == message_id)
342 {
343 Some(cursor.start().1 .0)
344 } else {
345 None
346 },
347 );
348 }
349 }
350 ControlFlow::Continue(chat.load_more_messages(cx))
351 })
352 .log_err()?;
353 match step {
354 ControlFlow::Break(ix) => return ix,
355 ControlFlow::Continue(task) => task?.await?,
356 }
357 }
358 }
359
360 pub fn acknowledge_last_message(&mut self, cx: &mut ModelContext<Self>) {
361 if let ChannelMessageId::Saved(latest_message_id) = self.messages.summary().max_id {
362 if self
363 .last_acknowledged_id
364 .map_or(true, |acknowledged_id| acknowledged_id < latest_message_id)
365 {
366 self.rpc
367 .send(proto::AckChannelMessage {
368 channel_id: self.channel_id.0,
369 message_id: latest_message_id,
370 })
371 .ok();
372 self.last_acknowledged_id = Some(latest_message_id);
373 self.channel_store.update(cx, |store, cx| {
374 store.acknowledge_message_id(self.channel_id, latest_message_id, cx);
375 });
376 }
377 }
378 }
379
380 async fn handle_loaded_messages(
381 this: WeakModel<Self>,
382 user_store: Model<UserStore>,
383 rpc: Arc<Client>,
384 proto_messages: Vec<proto::ChannelMessage>,
385 loaded_all_messages: bool,
386 cx: &mut AsyncAppContext,
387 ) -> Result<()> {
388 let loaded_messages = messages_from_proto(proto_messages, &user_store, cx).await?;
389
390 let first_loaded_message_id = loaded_messages.first().map(|m| m.id);
391 let loaded_message_ids = this.update(cx, |this, _| {
392 let mut loaded_message_ids: HashSet<u64> = HashSet::default();
393 for message in loaded_messages.iter() {
394 if let Some(saved_message_id) = message.id.into() {
395 loaded_message_ids.insert(saved_message_id);
396 }
397 }
398 for message in this.messages.iter() {
399 if let Some(saved_message_id) = message.id.into() {
400 loaded_message_ids.insert(saved_message_id);
401 }
402 }
403 loaded_message_ids
404 })?;
405
406 let missing_ancestors = loaded_messages
407 .iter()
408 .filter_map(|message| {
409 if let Some(ancestor_id) = message.reply_to_message_id {
410 if !loaded_message_ids.contains(&ancestor_id) {
411 return Some(ancestor_id);
412 }
413 }
414 None
415 })
416 .collect::<Vec<_>>();
417
418 let loaded_ancestors = if missing_ancestors.is_empty() {
419 None
420 } else {
421 let response = rpc
422 .request(proto::GetChannelMessagesById {
423 message_ids: missing_ancestors,
424 })
425 .await?;
426 Some(messages_from_proto(response.messages, &user_store, cx).await?)
427 };
428 this.update(cx, |this, cx| {
429 this.first_loaded_message_id = first_loaded_message_id.and_then(|msg_id| msg_id.into());
430 this.loaded_all_messages = loaded_all_messages;
431 this.insert_messages(loaded_messages, cx);
432 if let Some(loaded_ancestors) = loaded_ancestors {
433 this.insert_messages(loaded_ancestors, cx);
434 }
435 })?;
436
437 Ok(())
438 }
439
440 pub fn rejoin(&mut self, cx: &mut ModelContext<Self>) {
441 let user_store = self.user_store.clone();
442 let rpc = self.rpc.clone();
443 let channel_id = self.channel_id;
444 cx.spawn(move |this, mut cx| {
445 async move {
446 let response = rpc
447 .request(proto::JoinChannelChat {
448 channel_id: channel_id.0,
449 })
450 .await?;
451 Self::handle_loaded_messages(
452 this.clone(),
453 user_store.clone(),
454 rpc.clone(),
455 response.messages,
456 response.done,
457 &mut cx,
458 )
459 .await?;
460
461 let pending_messages = this.update(&mut cx, |this, _| {
462 this.pending_messages().cloned().collect::<Vec<_>>()
463 })?;
464
465 for pending_message in pending_messages {
466 let request = rpc.request(proto::SendChannelMessage {
467 channel_id: channel_id.0,
468 body: pending_message.body,
469 mentions: mentions_to_proto(&pending_message.mentions),
470 nonce: Some(pending_message.nonce.into()),
471 reply_to_message_id: pending_message.reply_to_message_id,
472 });
473 let response = request.await?;
474 let message = ChannelMessage::from_proto(
475 response.message.ok_or_else(|| anyhow!("invalid message"))?,
476 &user_store,
477 &mut cx,
478 )
479 .await?;
480 this.update(&mut cx, |this, cx| {
481 this.insert_messages(SumTree::from_item(message, &()), cx);
482 })?;
483 }
484
485 anyhow::Ok(())
486 }
487 .log_err()
488 })
489 .detach();
490 }
491
492 pub fn message_count(&self) -> usize {
493 self.messages.summary().count
494 }
495
496 pub fn messages(&self) -> &SumTree<ChannelMessage> {
497 &self.messages
498 }
499
500 pub fn message(&self, ix: usize) -> &ChannelMessage {
501 let mut cursor = self.messages.cursor::<Count>(&());
502 cursor.seek(&Count(ix), Bias::Right, &());
503 cursor.item().unwrap()
504 }
505
506 pub fn acknowledge_message(&mut self, id: u64) {
507 if self.acknowledged_message_ids.insert(id) {
508 self.rpc
509 .send(proto::AckChannelMessage {
510 channel_id: self.channel_id.0,
511 message_id: id,
512 })
513 .ok();
514 }
515 }
516
517 pub fn messages_in_range(&self, range: Range<usize>) -> impl Iterator<Item = &ChannelMessage> {
518 let mut cursor = self.messages.cursor::<Count>(&());
519 cursor.seek(&Count(range.start), Bias::Right, &());
520 cursor.take(range.len())
521 }
522
523 pub fn pending_messages(&self) -> impl Iterator<Item = &ChannelMessage> {
524 let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
525 cursor.seek(&ChannelMessageId::Pending(0), Bias::Left, &());
526 cursor
527 }
528
529 async fn handle_message_sent(
530 this: Model<Self>,
531 message: TypedEnvelope<proto::ChannelMessageSent>,
532 mut cx: AsyncAppContext,
533 ) -> Result<()> {
534 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
535 let message = message
536 .payload
537 .message
538 .ok_or_else(|| anyhow!("empty message"))?;
539 let message_id = message.id;
540
541 let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
542 this.update(&mut cx, |this, cx| {
543 this.insert_messages(SumTree::from_item(message, &()), cx);
544 cx.emit(ChannelChatEvent::NewMessage {
545 channel_id: this.channel_id,
546 message_id,
547 })
548 })?;
549
550 Ok(())
551 }
552
553 async fn handle_message_removed(
554 this: Model<Self>,
555 message: TypedEnvelope<proto::RemoveChannelMessage>,
556 mut cx: AsyncAppContext,
557 ) -> Result<()> {
558 this.update(&mut cx, |this, cx| {
559 this.message_removed(message.payload.message_id, cx)
560 })?;
561 Ok(())
562 }
563
564 async fn handle_message_updated(
565 this: Model<Self>,
566 message: TypedEnvelope<proto::ChannelMessageUpdate>,
567 mut cx: AsyncAppContext,
568 ) -> Result<()> {
569 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
570 let message = message
571 .payload
572 .message
573 .ok_or_else(|| anyhow!("empty message"))?;
574
575 let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
576
577 this.update(&mut cx, |this, cx| {
578 this.message_update(
579 message.id,
580 message.body,
581 message.mentions,
582 message.edited_at,
583 cx,
584 )
585 })?;
586 Ok(())
587 }
588
589 fn insert_messages(&mut self, messages: SumTree<ChannelMessage>, cx: &mut ModelContext<Self>) {
590 if let Some((first_message, last_message)) = messages.first().zip(messages.last()) {
591 let nonces = messages
592 .cursor::<()>(&())
593 .map(|m| m.nonce)
594 .collect::<HashSet<_>>();
595
596 let mut old_cursor = self.messages.cursor::<(ChannelMessageId, Count)>(&());
597 let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left, &());
598 let start_ix = old_cursor.start().1 .0;
599 let removed_messages = old_cursor.slice(&last_message.id, Bias::Right, &());
600 let removed_count = removed_messages.summary().count;
601 let new_count = messages.summary().count;
602 let end_ix = start_ix + removed_count;
603
604 new_messages.append(messages, &());
605
606 let mut ranges = Vec::<Range<usize>>::new();
607 if new_messages.last().unwrap().is_pending() {
608 new_messages.append(old_cursor.suffix(&()), &());
609 } else {
610 new_messages.append(
611 old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left, &()),
612 &(),
613 );
614
615 while let Some(message) = old_cursor.item() {
616 let message_ix = old_cursor.start().1 .0;
617 if nonces.contains(&message.nonce) {
618 if ranges.last().map_or(false, |r| r.end == message_ix) {
619 ranges.last_mut().unwrap().end += 1;
620 } else {
621 ranges.push(message_ix..message_ix + 1);
622 }
623 } else {
624 new_messages.push(message.clone(), &());
625 }
626 old_cursor.next(&());
627 }
628 }
629
630 drop(old_cursor);
631 self.messages = new_messages;
632
633 for range in ranges.into_iter().rev() {
634 cx.emit(ChannelChatEvent::MessagesUpdated {
635 old_range: range,
636 new_count: 0,
637 });
638 }
639 cx.emit(ChannelChatEvent::MessagesUpdated {
640 old_range: start_ix..end_ix,
641 new_count,
642 });
643
644 cx.notify();
645 }
646 }
647
648 fn message_removed(&mut self, id: u64, cx: &mut ModelContext<Self>) {
649 let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
650 let mut messages = cursor.slice(&ChannelMessageId::Saved(id), Bias::Left, &());
651 if let Some(item) = cursor.item() {
652 if item.id == ChannelMessageId::Saved(id) {
653 let deleted_message_ix = messages.summary().count;
654 cursor.next(&());
655 messages.append(cursor.suffix(&()), &());
656 drop(cursor);
657 self.messages = messages;
658
659 // If the message that was deleted was the last acknowledged message,
660 // replace the acknowledged message with an earlier one.
661 self.channel_store.update(cx, |store, _| {
662 let summary = self.messages.summary();
663 if summary.count == 0 {
664 store.set_acknowledged_message_id(self.channel_id, None);
665 } else if deleted_message_ix == summary.count {
666 if let ChannelMessageId::Saved(id) = summary.max_id {
667 store.set_acknowledged_message_id(self.channel_id, Some(id));
668 }
669 }
670 });
671
672 cx.emit(ChannelChatEvent::MessagesUpdated {
673 old_range: deleted_message_ix..deleted_message_ix + 1,
674 new_count: 0,
675 });
676 }
677 }
678 }
679
680 fn message_update(
681 &mut self,
682 id: ChannelMessageId,
683 body: String,
684 mentions: Vec<(Range<usize>, u64)>,
685 edited_at: Option<OffsetDateTime>,
686 cx: &mut ModelContext<Self>,
687 ) {
688 let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
689 let mut messages = cursor.slice(&id, Bias::Left, &());
690 let ix = messages.summary().count;
691
692 if let Some(mut message_to_update) = cursor.item().cloned() {
693 message_to_update.body = body;
694 message_to_update.mentions = mentions;
695 message_to_update.edited_at = edited_at;
696 messages.push(message_to_update, &());
697 cursor.next(&());
698 }
699
700 messages.append(cursor.suffix(&()), &());
701 drop(cursor);
702 self.messages = messages;
703
704 cx.emit(ChannelChatEvent::UpdateMessage {
705 message_ix: ix,
706 message_id: id,
707 });
708
709 cx.notify();
710 }
711}
712
713async fn messages_from_proto(
714 proto_messages: Vec<proto::ChannelMessage>,
715 user_store: &Model<UserStore>,
716 cx: &mut AsyncAppContext,
717) -> Result<SumTree<ChannelMessage>> {
718 let messages = ChannelMessage::from_proto_vec(proto_messages, user_store, cx).await?;
719 let mut result = SumTree::default();
720 result.extend(messages, &());
721 Ok(result)
722}
723
724impl ChannelMessage {
725 pub async fn from_proto(
726 message: proto::ChannelMessage,
727 user_store: &Model<UserStore>,
728 cx: &mut AsyncAppContext,
729 ) -> Result<Self> {
730 let sender = user_store
731 .update(cx, |user_store, cx| {
732 user_store.get_user(message.sender_id, cx)
733 })?
734 .await?;
735
736 let edited_at = message.edited_at.and_then(|t| -> Option<OffsetDateTime> {
737 if let Ok(a) = OffsetDateTime::from_unix_timestamp(t as i64) {
738 return Some(a);
739 }
740
741 None
742 });
743
744 Ok(ChannelMessage {
745 id: ChannelMessageId::Saved(message.id),
746 body: message.body,
747 mentions: message
748 .mentions
749 .into_iter()
750 .filter_map(|mention| {
751 let range = mention.range?;
752 Some((range.start as usize..range.end as usize, mention.user_id))
753 })
754 .collect(),
755 timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?,
756 sender,
757 nonce: message
758 .nonce
759 .ok_or_else(|| anyhow!("nonce is required"))?
760 .into(),
761 reply_to_message_id: message.reply_to_message_id,
762 edited_at,
763 })
764 }
765
766 pub fn is_pending(&self) -> bool {
767 matches!(self.id, ChannelMessageId::Pending(_))
768 }
769
770 pub async fn from_proto_vec(
771 proto_messages: Vec<proto::ChannelMessage>,
772 user_store: &Model<UserStore>,
773 cx: &mut AsyncAppContext,
774 ) -> Result<Vec<Self>> {
775 let unique_user_ids = proto_messages
776 .iter()
777 .map(|m| m.sender_id)
778 .collect::<HashSet<_>>()
779 .into_iter()
780 .collect();
781 user_store
782 .update(cx, |user_store, cx| {
783 user_store.get_users(unique_user_ids, cx)
784 })?
785 .await?;
786
787 let mut messages = Vec::with_capacity(proto_messages.len());
788 for message in proto_messages {
789 messages.push(ChannelMessage::from_proto(message, user_store, cx).await?);
790 }
791 Ok(messages)
792 }
793}
794
795pub fn mentions_to_proto(mentions: &[(Range<usize>, UserId)]) -> Vec<proto::ChatMention> {
796 mentions
797 .iter()
798 .map(|(range, user_id)| proto::ChatMention {
799 range: Some(proto::Range {
800 start: range.start as u64,
801 end: range.end as u64,
802 }),
803 user_id: *user_id,
804 })
805 .collect()
806}
807
808impl sum_tree::Item for ChannelMessage {
809 type Summary = ChannelMessageSummary;
810
811 fn summary(&self, _cx: &()) -> Self::Summary {
812 ChannelMessageSummary {
813 max_id: self.id,
814 count: 1,
815 }
816 }
817}
818
819impl Default for ChannelMessageId {
820 fn default() -> Self {
821 Self::Saved(0)
822 }
823}
824
825impl sum_tree::Summary for ChannelMessageSummary {
826 type Context = ();
827
828 fn zero(_cx: &Self::Context) -> Self {
829 Default::default()
830 }
831
832 fn add_summary(&mut self, summary: &Self, _: &()) {
833 self.max_id = summary.max_id;
834 self.count += summary.count;
835 }
836}
837
838impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId {
839 fn zero(_cx: &()) -> Self {
840 Default::default()
841 }
842
843 fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
844 debug_assert!(summary.max_id > *self);
845 *self = summary.max_id;
846 }
847}
848
849impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count {
850 fn zero(_cx: &()) -> Self {
851 Default::default()
852 }
853
854 fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
855 self.0 += summary.count;
856 }
857}
858
859impl<'a> From<&'a str> for MessageParams {
860 fn from(value: &'a str) -> Self {
861 Self {
862 text: value.into(),
863 mentions: Vec::new(),
864 reply_to_message_id: None,
865 }
866 }
867}