1use crate::{Channel, ChannelStore};
2use anyhow::{Result, anyhow};
3use client::{
4 ChannelId, Client, Subscription, TypedEnvelope, UserId, proto,
5 user::{User, UserStore},
6};
7use collections::HashSet;
8use futures::lock::Mutex;
9use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task, WeakEntity};
10use rand::prelude::*;
11use rpc::AnyProtoClient;
12use std::{
13 ops::{ControlFlow, Range},
14 sync::Arc,
15};
16use sum_tree::{Bias, SumTree};
17use time::OffsetDateTime;
18use util::{ResultExt as _, TryFutureExt, post_inc};
19
20pub struct ChannelChat {
21 pub channel_id: ChannelId,
22 messages: SumTree<ChannelMessage>,
23 acknowledged_message_ids: HashSet<u64>,
24 channel_store: Entity<ChannelStore>,
25 loaded_all_messages: bool,
26 last_acknowledged_id: Option<u64>,
27 next_pending_message_id: usize,
28 first_loaded_message_id: Option<u64>,
29 user_store: Entity<UserStore>,
30 rpc: Arc<Client>,
31 outgoing_messages_lock: Arc<Mutex<()>>,
32 rng: StdRng,
33 _subscription: Subscription,
34}
35
36#[derive(Debug, PartialEq, Eq)]
37pub struct MessageParams {
38 pub text: String,
39 pub mentions: Vec<(Range<usize>, UserId)>,
40 pub reply_to_message_id: Option<u64>,
41}
42
43#[derive(Clone, Debug)]
44pub struct ChannelMessage {
45 pub id: ChannelMessageId,
46 pub body: String,
47 pub timestamp: OffsetDateTime,
48 pub sender: Arc<User>,
49 pub nonce: u128,
50 pub mentions: Vec<(Range<usize>, UserId)>,
51 pub reply_to_message_id: Option<u64>,
52 pub edited_at: Option<OffsetDateTime>,
53}
54
55#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
56pub enum ChannelMessageId {
57 Saved(u64),
58 Pending(usize),
59}
60
61impl From<ChannelMessageId> for Option<u64> {
62 fn from(val: ChannelMessageId) -> Self {
63 match val {
64 ChannelMessageId::Saved(id) => Some(id),
65 ChannelMessageId::Pending(_) => None,
66 }
67 }
68}
69
70#[derive(Clone, Debug, Default)]
71pub struct ChannelMessageSummary {
72 max_id: ChannelMessageId,
73 count: usize,
74}
75
76#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
77struct Count(usize);
78
79#[derive(Clone, Debug, PartialEq)]
80pub enum ChannelChatEvent {
81 MessagesUpdated {
82 old_range: Range<usize>,
83 new_count: usize,
84 },
85 UpdateMessage {
86 message_id: ChannelMessageId,
87 message_ix: usize,
88 },
89 NewMessage {
90 channel_id: ChannelId,
91 message_id: u64,
92 },
93}
94
95impl EventEmitter<ChannelChatEvent> for ChannelChat {}
96pub fn init(client: &AnyProtoClient) {
97 client.add_entity_message_handler(ChannelChat::handle_message_sent);
98 client.add_entity_message_handler(ChannelChat::handle_message_removed);
99 client.add_entity_message_handler(ChannelChat::handle_message_updated);
100}
101
102impl ChannelChat {
103 pub async fn new(
104 channel: Arc<Channel>,
105 channel_store: Entity<ChannelStore>,
106 user_store: Entity<UserStore>,
107 client: Arc<Client>,
108 cx: &mut AsyncApp,
109 ) -> Result<Entity<Self>> {
110 let channel_id = channel.id;
111 let subscription = client.subscribe_to_entity(channel_id.0).unwrap();
112
113 let response = client
114 .request(proto::JoinChannelChat {
115 channel_id: channel_id.0,
116 })
117 .await?;
118
119 let handle = cx.new(|cx| {
120 cx.on_release(Self::release).detach();
121 Self {
122 channel_id: channel.id,
123 user_store: user_store.clone(),
124 channel_store,
125 rpc: client.clone(),
126 outgoing_messages_lock: Default::default(),
127 messages: Default::default(),
128 acknowledged_message_ids: Default::default(),
129 loaded_all_messages: false,
130 next_pending_message_id: 0,
131 last_acknowledged_id: None,
132 rng: StdRng::from_entropy(),
133 first_loaded_message_id: None,
134 _subscription: subscription.set_entity(&cx.entity(), &cx.to_async()),
135 }
136 })?;
137 Self::handle_loaded_messages(
138 handle.downgrade(),
139 user_store,
140 client,
141 response.messages,
142 response.done,
143 cx,
144 )
145 .await?;
146 Ok(handle)
147 }
148
149 fn release(&mut self, _: &mut App) {
150 self.rpc
151 .send(proto::LeaveChannelChat {
152 channel_id: self.channel_id.0,
153 })
154 .log_err();
155 }
156
157 pub fn channel(&self, cx: &App) -> Option<Arc<Channel>> {
158 self.channel_store
159 .read(cx)
160 .channel_for_id(self.channel_id)
161 .cloned()
162 }
163
164 pub fn client(&self) -> &Arc<Client> {
165 &self.rpc
166 }
167
168 pub fn send_message(
169 &mut self,
170 message: MessageParams,
171 cx: &mut Context<Self>,
172 ) -> Result<Task<Result<u64>>> {
173 if message.text.trim().is_empty() {
174 Err(anyhow!("message body can't be empty"))?;
175 }
176
177 let current_user = self
178 .user_store
179 .read(cx)
180 .current_user()
181 .ok_or_else(|| anyhow!("current_user is not present"))?;
182
183 let channel_id = self.channel_id;
184 let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id));
185 let nonce = self.rng.r#gen();
186 self.insert_messages(
187 SumTree::from_item(
188 ChannelMessage {
189 id: pending_id,
190 body: message.text.clone(),
191 sender: current_user,
192 timestamp: OffsetDateTime::now_utc(),
193 mentions: message.mentions.clone(),
194 nonce,
195 reply_to_message_id: message.reply_to_message_id,
196 edited_at: None,
197 },
198 &(),
199 ),
200 cx,
201 );
202 let user_store = self.user_store.clone();
203 let rpc = self.rpc.clone();
204 let outgoing_messages_lock = self.outgoing_messages_lock.clone();
205
206 // todo - handle messages that fail to send (e.g. >1024 chars)
207 Ok(cx.spawn(async move |this, cx| {
208 let outgoing_message_guard = outgoing_messages_lock.lock().await;
209 let request = rpc.request(proto::SendChannelMessage {
210 channel_id: channel_id.0,
211 body: message.text,
212 nonce: Some(nonce.into()),
213 mentions: mentions_to_proto(&message.mentions),
214 reply_to_message_id: message.reply_to_message_id,
215 });
216 let response = request.await?;
217 drop(outgoing_message_guard);
218 let response = response.message.ok_or_else(|| anyhow!("invalid message"))?;
219 let id = response.id;
220 let message = ChannelMessage::from_proto(response, &user_store, cx).await?;
221 this.update(cx, |this, cx| {
222 this.insert_messages(SumTree::from_item(message, &()), cx);
223 if this.first_loaded_message_id.is_none() {
224 this.first_loaded_message_id = Some(id);
225 }
226 })?;
227 Ok(id)
228 }))
229 }
230
231 pub fn remove_message(&mut self, id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
232 let response = self.rpc.request(proto::RemoveChannelMessage {
233 channel_id: self.channel_id.0,
234 message_id: id,
235 });
236 cx.spawn(async move |this, cx| {
237 response.await?;
238 this.update(cx, |this, cx| {
239 this.message_removed(id, cx);
240 })?;
241 Ok(())
242 })
243 }
244
245 pub fn update_message(
246 &mut self,
247 id: u64,
248 message: MessageParams,
249 cx: &mut Context<Self>,
250 ) -> Result<Task<Result<()>>> {
251 self.message_update(
252 ChannelMessageId::Saved(id),
253 message.text.clone(),
254 message.mentions.clone(),
255 Some(OffsetDateTime::now_utc()),
256 cx,
257 );
258
259 let nonce: u128 = self.rng.r#gen();
260
261 let request = self.rpc.request(proto::UpdateChannelMessage {
262 channel_id: self.channel_id.0,
263 message_id: id,
264 body: message.text,
265 nonce: Some(nonce.into()),
266 mentions: mentions_to_proto(&message.mentions),
267 });
268 Ok(cx.spawn(async move |_, _| {
269 request.await?;
270 Ok(())
271 }))
272 }
273
274 pub fn load_more_messages(&mut self, cx: &mut Context<Self>) -> Option<Task<Option<()>>> {
275 if self.loaded_all_messages {
276 return None;
277 }
278
279 let rpc = self.rpc.clone();
280 let user_store = self.user_store.clone();
281 let channel_id = self.channel_id;
282 let before_message_id = self.first_loaded_message_id()?;
283 Some(cx.spawn(async move |this, cx| {
284 async move {
285 let response = rpc
286 .request(proto::GetChannelMessages {
287 channel_id: channel_id.0,
288 before_message_id,
289 })
290 .await?;
291 Self::handle_loaded_messages(
292 this,
293 user_store,
294 rpc,
295 response.messages,
296 response.done,
297 cx,
298 )
299 .await?;
300
301 anyhow::Ok(())
302 }
303 .log_err()
304 .await
305 }))
306 }
307
308 pub fn first_loaded_message_id(&mut self) -> Option<u64> {
309 self.first_loaded_message_id
310 }
311
312 /// Load a message by its id, if it's already stored locally.
313 pub fn find_loaded_message(&self, id: u64) -> Option<&ChannelMessage> {
314 self.messages.iter().find(|message| match message.id {
315 ChannelMessageId::Saved(message_id) => message_id == id,
316 ChannelMessageId::Pending(_) => false,
317 })
318 }
319
320 /// Load all of the chat messages since a certain message id.
321 ///
322 /// For now, we always maintain a suffix of the channel's messages.
323 pub async fn load_history_since_message(
324 chat: Entity<Self>,
325 message_id: u64,
326 mut cx: AsyncApp,
327 ) -> Option<usize> {
328 loop {
329 let step = chat
330 .update(&mut cx, |chat, cx| {
331 if let Some(first_id) = chat.first_loaded_message_id() {
332 if first_id <= message_id {
333 let mut cursor = chat.messages.cursor::<(ChannelMessageId, Count)>(&());
334 let message_id = ChannelMessageId::Saved(message_id);
335 cursor.seek(&message_id, Bias::Left, &());
336 return ControlFlow::Break(
337 if cursor
338 .item()
339 .map_or(false, |message| message.id == message_id)
340 {
341 Some(cursor.start().1.0)
342 } else {
343 None
344 },
345 );
346 }
347 }
348 ControlFlow::Continue(chat.load_more_messages(cx))
349 })
350 .log_err()?;
351 match step {
352 ControlFlow::Break(ix) => return ix,
353 ControlFlow::Continue(task) => task?.await?,
354 }
355 }
356 }
357
358 pub fn acknowledge_last_message(&mut self, cx: &mut Context<Self>) {
359 if let ChannelMessageId::Saved(latest_message_id) = self.messages.summary().max_id {
360 if self
361 .last_acknowledged_id
362 .map_or(true, |acknowledged_id| acknowledged_id < latest_message_id)
363 {
364 self.rpc
365 .send(proto::AckChannelMessage {
366 channel_id: self.channel_id.0,
367 message_id: latest_message_id,
368 })
369 .ok();
370 self.last_acknowledged_id = Some(latest_message_id);
371 self.channel_store.update(cx, |store, cx| {
372 store.acknowledge_message_id(self.channel_id, latest_message_id, cx);
373 });
374 }
375 }
376 }
377
378 async fn handle_loaded_messages(
379 this: WeakEntity<Self>,
380 user_store: Entity<UserStore>,
381 rpc: Arc<Client>,
382 proto_messages: Vec<proto::ChannelMessage>,
383 loaded_all_messages: bool,
384 cx: &mut AsyncApp,
385 ) -> Result<()> {
386 let loaded_messages = messages_from_proto(proto_messages, &user_store, cx).await?;
387
388 let first_loaded_message_id = loaded_messages.first().map(|m| m.id);
389 let loaded_message_ids = this.update(cx, |this, _| {
390 let mut loaded_message_ids: HashSet<u64> = HashSet::default();
391 for message in loaded_messages.iter() {
392 if let Some(saved_message_id) = message.id.into() {
393 loaded_message_ids.insert(saved_message_id);
394 }
395 }
396 for message in this.messages.iter() {
397 if let Some(saved_message_id) = message.id.into() {
398 loaded_message_ids.insert(saved_message_id);
399 }
400 }
401 loaded_message_ids
402 })?;
403
404 let missing_ancestors = loaded_messages
405 .iter()
406 .filter_map(|message| {
407 if let Some(ancestor_id) = message.reply_to_message_id {
408 if !loaded_message_ids.contains(&ancestor_id) {
409 return Some(ancestor_id);
410 }
411 }
412 None
413 })
414 .collect::<Vec<_>>();
415
416 let loaded_ancestors = if missing_ancestors.is_empty() {
417 None
418 } else {
419 let response = rpc
420 .request(proto::GetChannelMessagesById {
421 message_ids: missing_ancestors,
422 })
423 .await?;
424 Some(messages_from_proto(response.messages, &user_store, cx).await?)
425 };
426 this.update(cx, |this, cx| {
427 this.first_loaded_message_id = first_loaded_message_id.and_then(|msg_id| msg_id.into());
428 this.loaded_all_messages = loaded_all_messages;
429 this.insert_messages(loaded_messages, cx);
430 if let Some(loaded_ancestors) = loaded_ancestors {
431 this.insert_messages(loaded_ancestors, cx);
432 }
433 })?;
434
435 Ok(())
436 }
437
438 pub fn rejoin(&mut self, cx: &mut Context<Self>) {
439 let user_store = self.user_store.clone();
440 let rpc = self.rpc.clone();
441 let channel_id = self.channel_id;
442 cx.spawn(async move |this, cx| {
443 async move {
444 let response = rpc
445 .request(proto::JoinChannelChat {
446 channel_id: channel_id.0,
447 })
448 .await?;
449 Self::handle_loaded_messages(
450 this.clone(),
451 user_store.clone(),
452 rpc.clone(),
453 response.messages,
454 response.done,
455 cx,
456 )
457 .await?;
458
459 let pending_messages = this.update(cx, |this, _| {
460 this.pending_messages().cloned().collect::<Vec<_>>()
461 })?;
462
463 for pending_message in pending_messages {
464 let request = rpc.request(proto::SendChannelMessage {
465 channel_id: channel_id.0,
466 body: pending_message.body,
467 mentions: mentions_to_proto(&pending_message.mentions),
468 nonce: Some(pending_message.nonce.into()),
469 reply_to_message_id: pending_message.reply_to_message_id,
470 });
471 let response = request.await?;
472 let message = ChannelMessage::from_proto(
473 response.message.ok_or_else(|| anyhow!("invalid message"))?,
474 &user_store,
475 cx,
476 )
477 .await?;
478 this.update(cx, |this, cx| {
479 this.insert_messages(SumTree::from_item(message, &()), cx);
480 })?;
481 }
482
483 anyhow::Ok(())
484 }
485 .log_err()
486 .await
487 })
488 .detach();
489 }
490
491 pub fn message_count(&self) -> usize {
492 self.messages.summary().count
493 }
494
495 pub fn messages(&self) -> &SumTree<ChannelMessage> {
496 &self.messages
497 }
498
499 pub fn message(&self, ix: usize) -> &ChannelMessage {
500 let mut cursor = self.messages.cursor::<Count>(&());
501 cursor.seek(&Count(ix), Bias::Right, &());
502 cursor.item().unwrap()
503 }
504
505 pub fn acknowledge_message(&mut self, id: u64) {
506 if self.acknowledged_message_ids.insert(id) {
507 self.rpc
508 .send(proto::AckChannelMessage {
509 channel_id: self.channel_id.0,
510 message_id: id,
511 })
512 .ok();
513 }
514 }
515
516 pub fn messages_in_range(&self, range: Range<usize>) -> impl Iterator<Item = &ChannelMessage> {
517 let mut cursor = self.messages.cursor::<Count>(&());
518 cursor.seek(&Count(range.start), Bias::Right, &());
519 cursor.take(range.len())
520 }
521
522 pub fn pending_messages(&self) -> impl Iterator<Item = &ChannelMessage> {
523 let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
524 cursor.seek(&ChannelMessageId::Pending(0), Bias::Left, &());
525 cursor
526 }
527
528 async fn handle_message_sent(
529 this: Entity<Self>,
530 message: TypedEnvelope<proto::ChannelMessageSent>,
531 mut cx: AsyncApp,
532 ) -> Result<()> {
533 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
534 let message = message
535 .payload
536 .message
537 .ok_or_else(|| anyhow!("empty message"))?;
538 let message_id = message.id;
539
540 let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
541 this.update(&mut cx, |this, cx| {
542 this.insert_messages(SumTree::from_item(message, &()), cx);
543 cx.emit(ChannelChatEvent::NewMessage {
544 channel_id: this.channel_id,
545 message_id,
546 })
547 })?;
548
549 Ok(())
550 }
551
552 async fn handle_message_removed(
553 this: Entity<Self>,
554 message: TypedEnvelope<proto::RemoveChannelMessage>,
555 mut cx: AsyncApp,
556 ) -> Result<()> {
557 this.update(&mut cx, |this, cx| {
558 this.message_removed(message.payload.message_id, cx)
559 })?;
560 Ok(())
561 }
562
563 async fn handle_message_updated(
564 this: Entity<Self>,
565 message: TypedEnvelope<proto::ChannelMessageUpdate>,
566 mut cx: AsyncApp,
567 ) -> Result<()> {
568 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
569 let message = message
570 .payload
571 .message
572 .ok_or_else(|| anyhow!("empty message"))?;
573
574 let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
575
576 this.update(&mut cx, |this, cx| {
577 this.message_update(
578 message.id,
579 message.body,
580 message.mentions,
581 message.edited_at,
582 cx,
583 )
584 })?;
585 Ok(())
586 }
587
588 fn insert_messages(&mut self, messages: SumTree<ChannelMessage>, cx: &mut Context<Self>) {
589 if let Some((first_message, last_message)) = messages.first().zip(messages.last()) {
590 let nonces = messages
591 .cursor::<()>(&())
592 .map(|m| m.nonce)
593 .collect::<HashSet<_>>();
594
595 let mut old_cursor = self.messages.cursor::<(ChannelMessageId, Count)>(&());
596 let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left, &());
597 let start_ix = old_cursor.start().1.0;
598 let removed_messages = old_cursor.slice(&last_message.id, Bias::Right, &());
599 let removed_count = removed_messages.summary().count;
600 let new_count = messages.summary().count;
601 let end_ix = start_ix + removed_count;
602
603 new_messages.append(messages, &());
604
605 let mut ranges = Vec::<Range<usize>>::new();
606 if new_messages.last().unwrap().is_pending() {
607 new_messages.append(old_cursor.suffix(&()), &());
608 } else {
609 new_messages.append(
610 old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left, &()),
611 &(),
612 );
613
614 while let Some(message) = old_cursor.item() {
615 let message_ix = old_cursor.start().1.0;
616 if nonces.contains(&message.nonce) {
617 if ranges.last().map_or(false, |r| r.end == message_ix) {
618 ranges.last_mut().unwrap().end += 1;
619 } else {
620 ranges.push(message_ix..message_ix + 1);
621 }
622 } else {
623 new_messages.push(message.clone(), &());
624 }
625 old_cursor.next(&());
626 }
627 }
628
629 drop(old_cursor);
630 self.messages = new_messages;
631
632 for range in ranges.into_iter().rev() {
633 cx.emit(ChannelChatEvent::MessagesUpdated {
634 old_range: range,
635 new_count: 0,
636 });
637 }
638 cx.emit(ChannelChatEvent::MessagesUpdated {
639 old_range: start_ix..end_ix,
640 new_count,
641 });
642
643 cx.notify();
644 }
645 }
646
647 fn message_removed(&mut self, id: u64, cx: &mut Context<Self>) {
648 let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
649 let mut messages = cursor.slice(&ChannelMessageId::Saved(id), Bias::Left, &());
650 if let Some(item) = cursor.item() {
651 if item.id == ChannelMessageId::Saved(id) {
652 let deleted_message_ix = messages.summary().count;
653 cursor.next(&());
654 messages.append(cursor.suffix(&()), &());
655 drop(cursor);
656 self.messages = messages;
657
658 // If the message that was deleted was the last acknowledged message,
659 // replace the acknowledged message with an earlier one.
660 self.channel_store.update(cx, |store, _| {
661 let summary = self.messages.summary();
662 if summary.count == 0 {
663 store.set_acknowledged_message_id(self.channel_id, None);
664 } else if deleted_message_ix == summary.count {
665 if let ChannelMessageId::Saved(id) = summary.max_id {
666 store.set_acknowledged_message_id(self.channel_id, Some(id));
667 }
668 }
669 });
670
671 cx.emit(ChannelChatEvent::MessagesUpdated {
672 old_range: deleted_message_ix..deleted_message_ix + 1,
673 new_count: 0,
674 });
675 }
676 }
677 }
678
679 fn message_update(
680 &mut self,
681 id: ChannelMessageId,
682 body: String,
683 mentions: Vec<(Range<usize>, u64)>,
684 edited_at: Option<OffsetDateTime>,
685 cx: &mut Context<Self>,
686 ) {
687 let mut cursor = self.messages.cursor::<ChannelMessageId>(&());
688 let mut messages = cursor.slice(&id, Bias::Left, &());
689 let ix = messages.summary().count;
690
691 if let Some(mut message_to_update) = cursor.item().cloned() {
692 message_to_update.body = body;
693 message_to_update.mentions = mentions;
694 message_to_update.edited_at = edited_at;
695 messages.push(message_to_update, &());
696 cursor.next(&());
697 }
698
699 messages.append(cursor.suffix(&()), &());
700 drop(cursor);
701 self.messages = messages;
702
703 cx.emit(ChannelChatEvent::UpdateMessage {
704 message_ix: ix,
705 message_id: id,
706 });
707
708 cx.notify();
709 }
710}
711
712async fn messages_from_proto(
713 proto_messages: Vec<proto::ChannelMessage>,
714 user_store: &Entity<UserStore>,
715 cx: &mut AsyncApp,
716) -> Result<SumTree<ChannelMessage>> {
717 let messages = ChannelMessage::from_proto_vec(proto_messages, user_store, cx).await?;
718 let mut result = SumTree::default();
719 result.extend(messages, &());
720 Ok(result)
721}
722
723impl ChannelMessage {
724 pub async fn from_proto(
725 message: proto::ChannelMessage,
726 user_store: &Entity<UserStore>,
727 cx: &mut AsyncApp,
728 ) -> Result<Self> {
729 let sender = user_store
730 .update(cx, |user_store, cx| {
731 user_store.get_user(message.sender_id, cx)
732 })?
733 .await?;
734
735 let edited_at = message.edited_at.and_then(|t| -> Option<OffsetDateTime> {
736 if let Ok(a) = OffsetDateTime::from_unix_timestamp(t as i64) {
737 return Some(a);
738 }
739
740 None
741 });
742
743 Ok(ChannelMessage {
744 id: ChannelMessageId::Saved(message.id),
745 body: message.body,
746 mentions: message
747 .mentions
748 .into_iter()
749 .filter_map(|mention| {
750 let range = mention.range?;
751 Some((range.start as usize..range.end as usize, mention.user_id))
752 })
753 .collect(),
754 timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?,
755 sender,
756 nonce: message
757 .nonce
758 .ok_or_else(|| anyhow!("nonce is required"))?
759 .into(),
760 reply_to_message_id: message.reply_to_message_id,
761 edited_at,
762 })
763 }
764
765 pub fn is_pending(&self) -> bool {
766 matches!(self.id, ChannelMessageId::Pending(_))
767 }
768
769 pub async fn from_proto_vec(
770 proto_messages: Vec<proto::ChannelMessage>,
771 user_store: &Entity<UserStore>,
772 cx: &mut AsyncApp,
773 ) -> Result<Vec<Self>> {
774 let unique_user_ids = proto_messages
775 .iter()
776 .map(|m| m.sender_id)
777 .collect::<HashSet<_>>()
778 .into_iter()
779 .collect();
780 user_store
781 .update(cx, |user_store, cx| {
782 user_store.get_users(unique_user_ids, cx)
783 })?
784 .await?;
785
786 let mut messages = Vec::with_capacity(proto_messages.len());
787 for message in proto_messages {
788 messages.push(ChannelMessage::from_proto(message, user_store, cx).await?);
789 }
790 Ok(messages)
791 }
792}
793
794pub fn mentions_to_proto(mentions: &[(Range<usize>, UserId)]) -> Vec<proto::ChatMention> {
795 mentions
796 .iter()
797 .map(|(range, user_id)| proto::ChatMention {
798 range: Some(proto::Range {
799 start: range.start as u64,
800 end: range.end as u64,
801 }),
802 user_id: *user_id,
803 })
804 .collect()
805}
806
807impl sum_tree::Item for ChannelMessage {
808 type Summary = ChannelMessageSummary;
809
810 fn summary(&self, _cx: &()) -> Self::Summary {
811 ChannelMessageSummary {
812 max_id: self.id,
813 count: 1,
814 }
815 }
816}
817
818impl Default for ChannelMessageId {
819 fn default() -> Self {
820 Self::Saved(0)
821 }
822}
823
824impl sum_tree::Summary for ChannelMessageSummary {
825 type Context = ();
826
827 fn zero(_cx: &Self::Context) -> Self {
828 Default::default()
829 }
830
831 fn add_summary(&mut self, summary: &Self, _: &()) {
832 self.max_id = summary.max_id;
833 self.count += summary.count;
834 }
835}
836
837impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId {
838 fn zero(_cx: &()) -> Self {
839 Default::default()
840 }
841
842 fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
843 debug_assert!(summary.max_id > *self);
844 *self = summary.max_id;
845 }
846}
847
848impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count {
849 fn zero(_cx: &()) -> Self {
850 Default::default()
851 }
852
853 fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
854 self.0 += summary.count;
855 }
856}
857
858impl<'a> From<&'a str> for MessageParams {
859 fn from(value: &'a str) -> Self {
860 Self {
861 text: value.into(),
862 mentions: Vec::new(),
863 reply_to_message_id: None,
864 }
865 }
866}