1use crate::{Channel, ChannelStore};
2use anyhow::{anyhow, Result};
3use client::{
4 proto,
5 user::{User, UserStore},
6 ChannelId, Client, Subscription, TypedEnvelope, UserId,
7};
8use collections::HashSet;
9use futures::lock::Mutex;
10use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task, WeakEntity};
11use rand::prelude::*;
12use rpc::AnyProtoClient;
13use std::{
14 ops::{ControlFlow, Range},
15 sync::Arc,
16};
17use sum_tree::{Bias, SumTree};
18use time::OffsetDateTime;
19use util::{post_inc, ResultExt as _, TryFutureExt};
20
21pub struct ChannelChat {
22 pub channel_id: ChannelId,
23 messages: SumTree<ChannelMessage>,
24 acknowledged_message_ids: HashSet<u64>,
25 channel_store: Entity<ChannelStore>,
26 loaded_all_messages: bool,
27 last_acknowledged_id: Option<u64>,
28 next_pending_message_id: usize,
29 first_loaded_message_id: Option<u64>,
30 user_store: Entity<UserStore>,
31 rpc: Arc<Client>,
32 outgoing_messages_lock: Arc<Mutex<()>>,
33 rng: StdRng,
34 _subscription: Subscription,
35}
36
37#[derive(Debug, PartialEq, Eq)]
38pub struct MessageParams {
39 pub text: String,
40 pub mentions: Vec<(Range<usize>, UserId)>,
41 pub reply_to_message_id: Option<u64>,
42}
43
44#[derive(Clone, Debug)]
45pub struct ChannelMessage {
46 pub id: ChannelMessageId,
47 pub body: String,
48 pub timestamp: OffsetDateTime,
49 pub sender: Arc<User>,
50 pub nonce: u128,
51 pub mentions: Vec<(Range<usize>, UserId)>,
52 pub reply_to_message_id: Option<u64>,
53 pub edited_at: Option<OffsetDateTime>,
54}
55
56#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
57pub enum ChannelMessageId {
58 Saved(u64),
59 Pending(usize),
60}
61
62impl From<ChannelMessageId> for Option<u64> {
63 fn from(val: ChannelMessageId) -> Self {
64 match val {
65 ChannelMessageId::Saved(id) => Some(id),
66 ChannelMessageId::Pending(_) => None,
67 }
68 }
69}
70
71#[derive(Clone, Debug, Default)]
72pub struct ChannelMessageSummary {
73 max_id: ChannelMessageId,
74 count: usize,
75}
76
77#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
78struct Count(usize);
79
80#[derive(Clone, Debug, PartialEq)]
81pub enum ChannelChatEvent {
82 MessagesUpdated {
83 old_range: Range<usize>,
84 new_count: usize,
85 },
86 UpdateMessage {
87 message_id: ChannelMessageId,
88 message_ix: usize,
89 },
90 NewMessage {
91 channel_id: ChannelId,
92 message_id: u64,
93 },
94}
95
96impl EventEmitter<ChannelChatEvent> for ChannelChat {}
97pub fn init(client: &AnyProtoClient) {
98 client.add_entity_message_handler(ChannelChat::handle_message_sent);
99 client.add_entity_message_handler(ChannelChat::handle_message_removed);
100 client.add_entity_message_handler(ChannelChat::handle_message_updated);
101}
102
103impl ChannelChat {
104 pub async fn new(
105 channel: Arc<Channel>,
106 channel_store: Entity<ChannelStore>,
107 user_store: Entity<UserStore>,
108 client: Arc<Client>,
109 mut cx: AsyncApp,
110 ) -> Result<Entity<Self>> {
111 let channel_id = channel.id;
112 let subscription = client.subscribe_to_entity(channel_id.0).unwrap();
113
114 let response = client
115 .request(proto::JoinChannelChat {
116 channel_id: channel_id.0,
117 })
118 .await?;
119
120 let handle = cx.new(|cx| {
121 cx.on_release(Self::release).detach();
122 Self {
123 channel_id: channel.id,
124 user_store: user_store.clone(),
125 channel_store,
126 rpc: client.clone(),
127 outgoing_messages_lock: Default::default(),
128 messages: Default::default(),
129 acknowledged_message_ids: Default::default(),
130 loaded_all_messages: false,
131 next_pending_message_id: 0,
132 last_acknowledged_id: None,
133 rng: StdRng::from_entropy(),
134 first_loaded_message_id: None,
135 _subscription: subscription.set_entity(&cx.entity(), &mut cx.to_async()),
136 }
137 })?;
138 Self::handle_loaded_messages(
139 handle.downgrade(),
140 user_store,
141 client,
142 response.messages,
143 response.done,
144 &mut cx,
145 )
146 .await?;
147 Ok(handle)
148 }
149
150 fn release(&mut self, _: &mut App) {
151 self.rpc
152 .send(proto::LeaveChannelChat {
153 channel_id: self.channel_id.0,
154 })
155 .log_err();
156 }
157
158 pub fn channel(&self, cx: &App) -> Option<Arc<Channel>> {
159 self.channel_store
160 .read(cx)
161 .channel_for_id(self.channel_id)
162 .cloned()
163 }
164
165 pub fn client(&self) -> &Arc<Client> {
166 &self.rpc
167 }
168
169 pub fn send_message(
170 &mut self,
171 message: MessageParams,
172 cx: &mut Context<Self>,
173 ) -> Result<Task<Result<u64>>> {
174 if message.text.trim().is_empty() {
175 Err(anyhow!("message body can't be empty"))?;
176 }
177
178 let current_user = self
179 .user_store
180 .read(cx)
181 .current_user()
182 .ok_or_else(|| anyhow!("current_user is not present"))?;
183
184 let channel_id = self.channel_id;
185 let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id));
186 let nonce = self.rng.gen();
187 self.insert_messages(
188 SumTree::from_item(
189 ChannelMessage {
190 id: pending_id,
191 body: message.text.clone(),
192 sender: current_user,
193 timestamp: OffsetDateTime::now_utc(),
194 mentions: message.mentions.clone(),
195 nonce,
196 reply_to_message_id: message.reply_to_message_id,
197 edited_at: None,
198 },
199 &(),
200 ),
201 cx,
202 );
203 let user_store = self.user_store.clone();
204 let rpc = self.rpc.clone();
205 let outgoing_messages_lock = self.outgoing_messages_lock.clone();
206
207 // todo - handle messages that fail to send (e.g. >1024 chars)
208 Ok(cx.spawn(move |this, mut cx| async move {
209 let outgoing_message_guard = outgoing_messages_lock.lock().await;
210 let request = rpc.request(proto::SendChannelMessage {
211 channel_id: channel_id.0,
212 body: message.text,
213 nonce: Some(nonce.into()),
214 mentions: mentions_to_proto(&message.mentions),
215 reply_to_message_id: message.reply_to_message_id,
216 });
217 let response = request.await?;
218 drop(outgoing_message_guard);
219 let response = response.message.ok_or_else(|| anyhow!("invalid message"))?;
220 let id = response.id;
221 let message = ChannelMessage::from_proto(response, &user_store, &mut cx).await?;
222 this.update(&mut cx, |this, cx| {
223 this.insert_messages(SumTree::from_item(message, &()), cx);
224 if this.first_loaded_message_id.is_none() {
225 this.first_loaded_message_id = Some(id);
226 }
227 })?;
228 Ok(id)
229 }))
230 }
231
232 pub fn remove_message(&mut self, id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
233 let response = self.rpc.request(proto::RemoveChannelMessage {
234 channel_id: self.channel_id.0,
235 message_id: id,
236 });
237 cx.spawn(move |this, mut cx| async move {
238 response.await?;
239 this.update(&mut cx, |this, cx| {
240 this.message_removed(id, cx);
241 })?;
242 Ok(())
243 })
244 }
245
246 pub fn update_message(
247 &mut self,
248 id: u64,
249 message: MessageParams,
250 cx: &mut Context<Self>,
251 ) -> Result<Task<Result<()>>> {
252 self.message_update(
253 ChannelMessageId::Saved(id),
254 message.text.clone(),
255 message.mentions.clone(),
256 Some(OffsetDateTime::now_utc()),
257 cx,
258 );
259
260 let nonce: u128 = self.rng.gen();
261
262 let request = self.rpc.request(proto::UpdateChannelMessage {
263 channel_id: self.channel_id.0,
264 message_id: id,
265 body: message.text,
266 nonce: Some(nonce.into()),
267 mentions: mentions_to_proto(&message.mentions),
268 });
269 Ok(cx.spawn(move |_, _| async move {
270 request.await?;
271 Ok(())
272 }))
273 }
274
275 pub fn load_more_messages(&mut self, cx: &mut Context<Self>) -> Option<Task<Option<()>>> {
276 if self.loaded_all_messages {
277 return None;
278 }
279
280 let rpc = self.rpc.clone();
281 let user_store = self.user_store.clone();
282 let channel_id = self.channel_id;
283 let before_message_id = self.first_loaded_message_id()?;
284 Some(cx.spawn(move |this, mut cx| {
285 async move {
286 let response = rpc
287 .request(proto::GetChannelMessages {
288 channel_id: channel_id.0,
289 before_message_id,
290 })
291 .await?;
292 Self::handle_loaded_messages(
293 this,
294 user_store,
295 rpc,
296 response.messages,
297 response.done,
298 &mut cx,
299 )
300 .await?;
301
302 anyhow::Ok(())
303 }
304 .log_err()
305 }))
306 }
307
308 pub fn first_loaded_message_id(&mut self) -> Option<u64> {
309 self.first_loaded_message_id
310 }
311
312 /// Load a message by its id, if it's already stored locally.
313 pub fn find_loaded_message(&self, id: u64) -> Option<&ChannelMessage> {
314 self.messages.iter().find(|message| match message.id {
315 ChannelMessageId::Saved(message_id) => message_id == id,
316 ChannelMessageId::Pending(_) => false,
317 })
318 }
319
320 /// Load all of the chat messages since a certain message id.
321 ///
322 /// For now, we always maintain a suffix of the channel's messages.
323 pub async fn load_history_since_message(
324 chat: Entity<Self>,
325 message_id: u64,
326 mut cx: AsyncApp,
327 ) -> Option<usize> {
328 loop {
329 let step = chat
330 .update(&mut cx, |chat, cx| {
331 if let Some(first_id) = chat.first_loaded_message_id() {
332 if first_id <= message_id {
333 let mut cursor = chat.messages.cursor::<(ChannelMessageId, Count)>(&());
334 let message_id = ChannelMessageId::Saved(message_id);
335 cursor.seek(&message_id, Bias::Left, &());
336 return ControlFlow::Break(
337 if cursor
338 .item()
339 .map_or(false, |message| message.id == message_id)
340 {
341 Some(cursor.start().1 .0)
342 } else {
343 None
344 },
345 );
346 }
347 }
348 ControlFlow::Continue(chat.load_more_messages(cx))
349 })
350 .log_err()?;
351 match step {
352 ControlFlow::Break(ix) => return ix,
353 ControlFlow::Continue(task) => task?.await?,
354 }
355 }
356 }
357
358 pub fn acknowledge_last_message(&mut self, cx: &mut Context<Self>) {
359 if let ChannelMessageId::Saved(latest_message_id) = self.messages.summary().max_id {
360 if self
361 .last_acknowledged_id
362 .map_or(true, |acknowledged_id| acknowledged_id < latest_message_id)
363 {
364 self.rpc
365 .send(proto::AckChannelMessage {
366 channel_id: self.channel_id.0,
367 message_id: latest_message_id,
368 })
369 .ok();
370 self.last_acknowledged_id = Some(latest_message_id);
371 self.channel_store.update(cx, |store, cx| {
372 store.acknowledge_message_id(self.channel_id, latest_message_id, cx);
373 });
374 }
375 }
376 }
377
378 async fn handle_loaded_messages(
379 this: WeakEntity<Self>,
380 user_store: Entity<UserStore>,
381 rpc: Arc<Client>,
382 proto_messages: Vec<proto::ChannelMessage>,
383 loaded_all_messages: bool,
384 cx: &mut AsyncApp,
385 ) -> Result<()> {
386 let loaded_messages = messages_from_proto(proto_messages, &user_store, cx).await?;
387
388 let first_loaded_message_id = loaded_messages.first().map(|m| m.id);
389 let loaded_message_ids = this.update(cx, |this, _| {
390 let mut loaded_message_ids: HashSet<u64> = HashSet::default();
391 for message in loaded_messages.iter() {
392 if let Some(saved_message_id) = message.id.into() {
393 loaded_message_ids.insert(saved_message_id);
394 }
395 }
396 for message in this.messages.iter() {
397 if let Some(saved_message_id) = message.id.into() {
398 loaded_message_ids.insert(saved_message_id);
399 }
400 }
401 loaded_message_ids
402 })?;
403
404 let missing_ancestors = loaded_messages
405 .iter()
406 .filter_map(|message| {
407 if let Some(ancestor_id) = message.reply_to_message_id {
408 if !loaded_message_ids.contains(&ancestor_id) {
409 return Some(ancestor_id);
410 }
411 }
412 None
413 })
414 .collect::<Vec<_>>();
415
416 let loaded_ancestors = if missing_ancestors.is_empty() {
417 None
418 } else {
419 let response = rpc
420 .request(proto::GetChannelMessagesById {
421 message_ids: missing_ancestors,
422 })
423 .await?;
424 Some(messages_from_proto(response.messages, &user_store, cx).await?)
425 };
426 this.update(cx, |this, cx| {
427 this.first_loaded_message_id = first_loaded_message_id.and_then(|msg_id| msg_id.into());
428 this.loaded_all_messages = loaded_all_messages;
429 this.insert_messages(loaded_messages, cx);
430 if let Some(loaded_ancestors) = loaded_ancestors {
431 this.insert_messages(loaded_ancestors, cx);
432 }
433 })?;
434
435 Ok(())
436 }
437
438 pub fn rejoin(&mut self, cx: &mut Context<Self>) {
439 let user_store = self.user_store.clone();
440 let rpc = self.rpc.clone();
441 let channel_id = self.channel_id;
442 cx.spawn(move |this, mut cx| {
443 async move {
444 let response = rpc
445 .request(proto::JoinChannelChat {
446 channel_id: channel_id.0,
447 })
448 .await?;
449 Self::handle_loaded_messages(
450 this.clone(),
451 user_store.clone(),
452 rpc.clone(),
453 response.messages,
454 response.done,
455 &mut cx,
456 )
457 .await?;
458
459 let pending_messages = this.update(&mut cx, |this, _| {
460 this.pending_messages().cloned().collect::<Vec<_>>()
461 })?;
462
463 for pending_message in pending_messages {
464 let request = rpc.request(proto::SendChannelMessage {
465 channel_id: channel_id.0,
466 body: pending_message.body,
467 mentions: mentions_to_proto(&pending_message.mentions),
468 nonce: Some(pending_message.nonce.into()),
469 reply_to_message_id: pending_message.reply_to_message_id,
470 });
471 let response = request.await?;
472 let message = ChannelMessage::from_proto(
473 response.message.ok_or_else(|| anyhow!("invalid message"))?,
474 &user_store,
475 &mut cx,
476 )
477 .await?;
478 this.update(&mut cx, |this, cx| {
479 this.insert_messages(SumTree::from_item(message, &()), cx);
480 })?;
481 }
482
483 anyhow::Ok(())
484 }
485 .log_err()
486 })
487 .detach();
488 }
489
490 pub fn message_count(&self) -> usize {
491 self.messages.summary().count
492 }
493
494 pub fn messages(&self) -> &SumTree<ChannelMessage> {
495 &self.messages
496 }
497
498 pub fn message(&self, ix: usize) -> &ChannelMessage {
499 let mut cursor = self.messages.cursor::<Count>(&());
500 cursor.seek(&Count(ix), Bias::Right, &());
501 cursor.item().unwrap()
502 }
503
504 pub fn acknowledge_message(&mut self, id: u64) {
505 if self.acknowledged_message_ids.insert(id) {
506 self.rpc
507 .send(proto::AckChannelMessage {
508 channel_id: self.channel_id.0,
509 message_id: id,
510 })
511 .ok();
512 }
513 }
514
515 pub fn messages_in_range(&self, range: Range<usize>) -> impl Iterator<Item = &ChannelMessage> {
516 let mut cursor = self.messages.cursor::<Count>(&());
517 cursor.seek(&Count(range.start), Bias::Right, &());
518 cursor.take(range.len())
519 }
520
521 pub fn pending_messages(&self) -> impl Iterator<Item = &ChannelMessage> {
522 let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
523 cursor.seek(&ChannelMessageId::Pending(0), Bias::Left, &());
524 cursor
525 }
526
527 async fn handle_message_sent(
528 this: Entity<Self>,
529 message: TypedEnvelope<proto::ChannelMessageSent>,
530 mut cx: AsyncApp,
531 ) -> Result<()> {
532 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
533 let message = message
534 .payload
535 .message
536 .ok_or_else(|| anyhow!("empty message"))?;
537 let message_id = message.id;
538
539 let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
540 this.update(&mut cx, |this, cx| {
541 this.insert_messages(SumTree::from_item(message, &()), cx);
542 cx.emit(ChannelChatEvent::NewMessage {
543 channel_id: this.channel_id,
544 message_id,
545 })
546 })?;
547
548 Ok(())
549 }
550
551 async fn handle_message_removed(
552 this: Entity<Self>,
553 message: TypedEnvelope<proto::RemoveChannelMessage>,
554 mut cx: AsyncApp,
555 ) -> Result<()> {
556 this.update(&mut cx, |this, cx| {
557 this.message_removed(message.payload.message_id, cx)
558 })?;
559 Ok(())
560 }
561
562 async fn handle_message_updated(
563 this: Entity<Self>,
564 message: TypedEnvelope<proto::ChannelMessageUpdate>,
565 mut cx: AsyncApp,
566 ) -> Result<()> {
567 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
568 let message = message
569 .payload
570 .message
571 .ok_or_else(|| anyhow!("empty message"))?;
572
573 let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
574
575 this.update(&mut cx, |this, cx| {
576 this.message_update(
577 message.id,
578 message.body,
579 message.mentions,
580 message.edited_at,
581 cx,
582 )
583 })?;
584 Ok(())
585 }
586
587 fn insert_messages(&mut self, messages: SumTree<ChannelMessage>, cx: &mut Context<Self>) {
588 if let Some((first_message, last_message)) = messages.first().zip(messages.last()) {
589 let nonces = messages
590 .cursor::<()>(&())
591 .map(|m| m.nonce)
592 .collect::<HashSet<_>>();
593
594 let mut old_cursor = self.messages.cursor::<(ChannelMessageId, Count)>(&());
595 let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left, &());
596 let start_ix = old_cursor.start().1 .0;
597 let removed_messages = old_cursor.slice(&last_message.id, Bias::Right, &());
598 let removed_count = removed_messages.summary().count;
599 let new_count = messages.summary().count;
600 let end_ix = start_ix + removed_count;
601
602 new_messages.append(messages, &());
603
604 let mut ranges = Vec::<Range<usize>>::new();
605 if new_messages.last().unwrap().is_pending() {
606 new_messages.append(old_cursor.suffix(&()), &());
607 } else {
608 new_messages.append(
609 old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left, &()),
610 &(),
611 );
612
613 while let Some(message) = old_cursor.item() {
614 let message_ix = old_cursor.start().1 .0;
615 if nonces.contains(&message.nonce) {
616 if ranges.last().map_or(false, |r| r.end == message_ix) {
617 ranges.last_mut().unwrap().end += 1;
618 } else {
619 ranges.push(message_ix..message_ix + 1);
620 }
621 } else {
622 new_messages.push(message.clone(), &());
623 }
624 old_cursor.next(&());
625 }
626 }
627
628 drop(old_cursor);
629 self.messages = new_messages;
630
631 for range in ranges.into_iter().rev() {
632 cx.emit(ChannelChatEvent::MessagesUpdated {
633 old_range: range,
634 new_count: 0,
635 });
636 }
637 cx.emit(ChannelChatEvent::MessagesUpdated {
638 old_range: start_ix..end_ix,
639 new_count,
640 });
641
642 cx.notify();
643 }
644 }
645
646 fn message_removed(&mut self, id: u64, cx: &mut Context<Self>) {
647 let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
648 let mut messages = cursor.slice(&ChannelMessageId::Saved(id), Bias::Left, &());
649 if let Some(item) = cursor.item() {
650 if item.id == ChannelMessageId::Saved(id) {
651 let deleted_message_ix = messages.summary().count;
652 cursor.next(&());
653 messages.append(cursor.suffix(&()), &());
654 drop(cursor);
655 self.messages = messages;
656
657 // If the message that was deleted was the last acknowledged message,
658 // replace the acknowledged message with an earlier one.
659 self.channel_store.update(cx, |store, _| {
660 let summary = self.messages.summary();
661 if summary.count == 0 {
662 store.set_acknowledged_message_id(self.channel_id, None);
663 } else if deleted_message_ix == summary.count {
664 if let ChannelMessageId::Saved(id) = summary.max_id {
665 store.set_acknowledged_message_id(self.channel_id, Some(id));
666 }
667 }
668 });
669
670 cx.emit(ChannelChatEvent::MessagesUpdated {
671 old_range: deleted_message_ix..deleted_message_ix + 1,
672 new_count: 0,
673 });
674 }
675 }
676 }
677
678 fn message_update(
679 &mut self,
680 id: ChannelMessageId,
681 body: String,
682 mentions: Vec<(Range<usize>, u64)>,
683 edited_at: Option<OffsetDateTime>,
684 cx: &mut Context<Self>,
685 ) {
686 let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
687 let mut messages = cursor.slice(&id, Bias::Left, &());
688 let ix = messages.summary().count;
689
690 if let Some(mut message_to_update) = cursor.item().cloned() {
691 message_to_update.body = body;
692 message_to_update.mentions = mentions;
693 message_to_update.edited_at = edited_at;
694 messages.push(message_to_update, &());
695 cursor.next(&());
696 }
697
698 messages.append(cursor.suffix(&()), &());
699 drop(cursor);
700 self.messages = messages;
701
702 cx.emit(ChannelChatEvent::UpdateMessage {
703 message_ix: ix,
704 message_id: id,
705 });
706
707 cx.notify();
708 }
709}
710
711async fn messages_from_proto(
712 proto_messages: Vec<proto::ChannelMessage>,
713 user_store: &Entity<UserStore>,
714 cx: &mut AsyncApp,
715) -> Result<SumTree<ChannelMessage>> {
716 let messages = ChannelMessage::from_proto_vec(proto_messages, user_store, cx).await?;
717 let mut result = SumTree::default();
718 result.extend(messages, &());
719 Ok(result)
720}
721
722impl ChannelMessage {
723 pub async fn from_proto(
724 message: proto::ChannelMessage,
725 user_store: &Entity<UserStore>,
726 cx: &mut AsyncApp,
727 ) -> Result<Self> {
728 let sender = user_store
729 .update(cx, |user_store, cx| {
730 user_store.get_user(message.sender_id, cx)
731 })?
732 .await?;
733
734 let edited_at = message.edited_at.and_then(|t| -> Option<OffsetDateTime> {
735 if let Ok(a) = OffsetDateTime::from_unix_timestamp(t as i64) {
736 return Some(a);
737 }
738
739 None
740 });
741
742 Ok(ChannelMessage {
743 id: ChannelMessageId::Saved(message.id),
744 body: message.body,
745 mentions: message
746 .mentions
747 .into_iter()
748 .filter_map(|mention| {
749 let range = mention.range?;
750 Some((range.start as usize..range.end as usize, mention.user_id))
751 })
752 .collect(),
753 timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?,
754 sender,
755 nonce: message
756 .nonce
757 .ok_or_else(|| anyhow!("nonce is required"))?
758 .into(),
759 reply_to_message_id: message.reply_to_message_id,
760 edited_at,
761 })
762 }
763
764 pub fn is_pending(&self) -> bool {
765 matches!(self.id, ChannelMessageId::Pending(_))
766 }
767
768 pub async fn from_proto_vec(
769 proto_messages: Vec<proto::ChannelMessage>,
770 user_store: &Entity<UserStore>,
771 cx: &mut AsyncApp,
772 ) -> Result<Vec<Self>> {
773 let unique_user_ids = proto_messages
774 .iter()
775 .map(|m| m.sender_id)
776 .collect::<HashSet<_>>()
777 .into_iter()
778 .collect();
779 user_store
780 .update(cx, |user_store, cx| {
781 user_store.get_users(unique_user_ids, cx)
782 })?
783 .await?;
784
785 let mut messages = Vec::with_capacity(proto_messages.len());
786 for message in proto_messages {
787 messages.push(ChannelMessage::from_proto(message, user_store, cx).await?);
788 }
789 Ok(messages)
790 }
791}
792
793pub fn mentions_to_proto(mentions: &[(Range<usize>, UserId)]) -> Vec<proto::ChatMention> {
794 mentions
795 .iter()
796 .map(|(range, user_id)| proto::ChatMention {
797 range: Some(proto::Range {
798 start: range.start as u64,
799 end: range.end as u64,
800 }),
801 user_id: *user_id,
802 })
803 .collect()
804}
805
806impl sum_tree::Item for ChannelMessage {
807 type Summary = ChannelMessageSummary;
808
809 fn summary(&self, _cx: &()) -> Self::Summary {
810 ChannelMessageSummary {
811 max_id: self.id,
812 count: 1,
813 }
814 }
815}
816
817impl Default for ChannelMessageId {
818 fn default() -> Self {
819 Self::Saved(0)
820 }
821}
822
823impl sum_tree::Summary for ChannelMessageSummary {
824 type Context = ();
825
826 fn zero(_cx: &Self::Context) -> Self {
827 Default::default()
828 }
829
830 fn add_summary(&mut self, summary: &Self, _: &()) {
831 self.max_id = summary.max_id;
832 self.count += summary.count;
833 }
834}
835
836impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId {
837 fn zero(_cx: &()) -> Self {
838 Default::default()
839 }
840
841 fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
842 debug_assert!(summary.max_id > *self);
843 *self = summary.max_id;
844 }
845}
846
847impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count {
848 fn zero(_cx: &()) -> Self {
849 Default::default()
850 }
851
852 fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
853 self.0 += summary.count;
854 }
855}
856
857impl<'a> From<&'a str> for MessageParams {
858 fn from(value: &'a str) -> Self {
859 Self {
860 text: value.into(),
861 mentions: Vec::new(),
862 reply_to_message_id: None,
863 }
864 }
865}