channel.rs

  1use super::{
  2    proto,
  3    user::{User, UserStore},
  4    Client, Status, Subscription, TypedEnvelope,
  5};
  6use anyhow::{anyhow, Context, Result};
  7use futures::lock::Mutex;
  8use gpui::{
  9    AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
 10};
 11use postage::prelude::Stream;
 12use rand::prelude::*;
 13use std::{
 14    collections::{HashMap, HashSet},
 15    mem,
 16    ops::Range,
 17    sync::Arc,
 18};
 19use sum_tree::{Bias, SumTree};
 20use time::OffsetDateTime;
 21use util::{post_inc, ResultExt as _, TryFutureExt};
 22
 23pub struct ChannelList {
 24    available_channels: Option<Vec<ChannelDetails>>,
 25    channels: HashMap<u64, WeakModelHandle<Channel>>,
 26    client: Arc<Client>,
 27    user_store: ModelHandle<UserStore>,
 28    _task: Task<Option<()>>,
 29}
 30
 31#[derive(Clone, Debug, PartialEq)]
 32pub struct ChannelDetails {
 33    pub id: u64,
 34    pub name: String,
 35}
 36
 37pub struct Channel {
 38    details: ChannelDetails,
 39    messages: SumTree<ChannelMessage>,
 40    loaded_all_messages: bool,
 41    next_pending_message_id: usize,
 42    user_store: ModelHandle<UserStore>,
 43    rpc: Arc<Client>,
 44    outgoing_messages_lock: Arc<Mutex<()>>,
 45    rng: StdRng,
 46    _subscription: Subscription,
 47}
 48
 49#[derive(Clone, Debug)]
 50pub struct ChannelMessage {
 51    pub id: ChannelMessageId,
 52    pub body: String,
 53    pub timestamp: OffsetDateTime,
 54    pub sender: Arc<User>,
 55    pub nonce: u128,
 56}
 57
 58#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
 59pub enum ChannelMessageId {
 60    Saved(u64),
 61    Pending(usize),
 62}
 63
 64#[derive(Clone, Debug, Default)]
 65pub struct ChannelMessageSummary {
 66    max_id: ChannelMessageId,
 67    count: usize,
 68}
 69
 70#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
 71struct Count(usize);
 72
 73pub enum ChannelListEvent {}
 74
 75#[derive(Clone, Debug, PartialEq)]
 76pub enum ChannelEvent {
 77    MessagesUpdated {
 78        old_range: Range<usize>,
 79        new_count: usize,
 80    },
 81}
 82
 83impl Entity for ChannelList {
 84    type Event = ChannelListEvent;
 85}
 86
 87impl ChannelList {
 88    pub fn new(
 89        user_store: ModelHandle<UserStore>,
 90        rpc: Arc<Client>,
 91        cx: &mut ModelContext<Self>,
 92    ) -> Self {
 93        let _task = cx.spawn_weak(|this, mut cx| {
 94            let rpc = rpc.clone();
 95            async move {
 96                let mut status = rpc.status();
 97                while let Some((status, this)) = status.recv().await.zip(this.upgrade(&cx)) {
 98                    match status {
 99                        Status::Connected { .. } => {
100                            let response = rpc
101                                .request(proto::GetChannels {})
102                                .await
103                                .context("failed to fetch available channels")?;
104                            this.update(&mut cx, |this, cx| {
105                                this.available_channels =
106                                    Some(response.channels.into_iter().map(Into::into).collect());
107
108                                let mut to_remove = Vec::new();
109                                for (channel_id, channel) in &this.channels {
110                                    if let Some(channel) = channel.upgrade(cx) {
111                                        channel.update(cx, |channel, cx| channel.rejoin(cx))
112                                    } else {
113                                        to_remove.push(*channel_id);
114                                    }
115                                }
116
117                                for channel_id in to_remove {
118                                    this.channels.remove(&channel_id);
119                                }
120                                cx.notify();
121                            });
122                        }
123                        Status::SignedOut { .. } => {
124                            this.update(&mut cx, |this, cx| {
125                                this.available_channels = None;
126                                this.channels.clear();
127                                cx.notify();
128                            });
129                        }
130                        _ => {}
131                    }
132                }
133                Ok(())
134            }
135            .log_err()
136        });
137
138        Self {
139            available_channels: None,
140            channels: Default::default(),
141            user_store,
142            client: rpc,
143            _task,
144        }
145    }
146
147    pub fn available_channels(&self) -> Option<&[ChannelDetails]> {
148        self.available_channels.as_deref()
149    }
150
151    pub fn get_channel(
152        &mut self,
153        id: u64,
154        cx: &mut MutableAppContext,
155    ) -> Option<ModelHandle<Channel>> {
156        if let Some(channel) = self.channels.get(&id).and_then(|c| c.upgrade(cx)) {
157            return Some(channel);
158        }
159
160        let channels = self.available_channels.as_ref()?;
161        let details = channels.iter().find(|details| details.id == id)?.clone();
162        let channel = cx.add_model(|cx| {
163            Channel::new(details, self.user_store.clone(), self.client.clone(), cx)
164        });
165        self.channels.insert(id, channel.downgrade());
166        Some(channel)
167    }
168}
169
170impl Entity for Channel {
171    type Event = ChannelEvent;
172
173    fn release(&mut self, _: &mut MutableAppContext) {
174        self.rpc
175            .send(proto::LeaveChannel {
176                channel_id: self.details.id,
177            })
178            .log_err();
179    }
180}
181
182impl Channel {
183    pub fn init(rpc: &Arc<Client>) {
184        rpc.add_model_message_handler(Self::handle_message_sent);
185    }
186
187    pub fn new(
188        details: ChannelDetails,
189        user_store: ModelHandle<UserStore>,
190        rpc: Arc<Client>,
191        cx: &mut ModelContext<Self>,
192    ) -> Self {
193        let _subscription = rpc.add_model_for_remote_entity(details.id, cx);
194
195        {
196            let user_store = user_store.clone();
197            let rpc = rpc.clone();
198            let channel_id = details.id;
199            cx.spawn(|channel, mut cx| {
200                async move {
201                    let response = rpc.request(proto::JoinChannel { channel_id }).await?;
202                    let messages =
203                        messages_from_proto(response.messages, &user_store, &mut cx).await?;
204                    let loaded_all_messages = response.done;
205
206                    channel.update(&mut cx, |channel, cx| {
207                        channel.insert_messages(messages, cx);
208                        channel.loaded_all_messages = loaded_all_messages;
209                    });
210
211                    Ok(())
212                }
213                .log_err()
214            })
215            .detach();
216        }
217
218        Self {
219            details,
220            user_store,
221            rpc,
222            outgoing_messages_lock: Default::default(),
223            messages: Default::default(),
224            loaded_all_messages: false,
225            next_pending_message_id: 0,
226            rng: StdRng::from_entropy(),
227            _subscription,
228        }
229    }
230
231    pub fn name(&self) -> &str {
232        &self.details.name
233    }
234
235    pub fn send_message(
236        &mut self,
237        body: String,
238        cx: &mut ModelContext<Self>,
239    ) -> Result<Task<Result<()>>> {
240        if body.is_empty() {
241            Err(anyhow!("message body can't be empty"))?;
242        }
243
244        let current_user = self
245            .user_store
246            .read(cx)
247            .current_user()
248            .ok_or_else(|| anyhow!("current_user is not present"))?;
249
250        let channel_id = self.details.id;
251        let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id));
252        let nonce = self.rng.gen();
253        self.insert_messages(
254            SumTree::from_item(
255                ChannelMessage {
256                    id: pending_id,
257                    body: body.clone(),
258                    sender: current_user,
259                    timestamp: OffsetDateTime::now_utc(),
260                    nonce,
261                },
262                &(),
263            ),
264            cx,
265        );
266        let user_store = self.user_store.clone();
267        let rpc = self.rpc.clone();
268        let outgoing_messages_lock = self.outgoing_messages_lock.clone();
269        Ok(cx.spawn(|this, mut cx| async move {
270            let outgoing_message_guard = outgoing_messages_lock.lock().await;
271            let request = rpc.request(proto::SendChannelMessage {
272                channel_id,
273                body,
274                nonce: Some(nonce.into()),
275            });
276            let response = request.await?;
277            drop(outgoing_message_guard);
278            let message = ChannelMessage::from_proto(
279                response.message.ok_or_else(|| anyhow!("invalid message"))?,
280                &user_store,
281                &mut cx,
282            )
283            .await?;
284            this.update(&mut cx, |this, cx| {
285                this.insert_messages(SumTree::from_item(message, &()), cx);
286                Ok(())
287            })
288        }))
289    }
290
291    pub fn load_more_messages(&mut self, cx: &mut ModelContext<Self>) -> bool {
292        if !self.loaded_all_messages {
293            let rpc = self.rpc.clone();
294            let user_store = self.user_store.clone();
295            let channel_id = self.details.id;
296            if let Some(before_message_id) =
297                self.messages.first().and_then(|message| match message.id {
298                    ChannelMessageId::Saved(id) => Some(id),
299                    ChannelMessageId::Pending(_) => None,
300                })
301            {
302                cx.spawn(|this, mut cx| {
303                    async move {
304                        let response = rpc
305                            .request(proto::GetChannelMessages {
306                                channel_id,
307                                before_message_id,
308                            })
309                            .await?;
310                        let loaded_all_messages = response.done;
311                        let messages =
312                            messages_from_proto(response.messages, &user_store, &mut cx).await?;
313                        this.update(&mut cx, |this, cx| {
314                            this.loaded_all_messages = loaded_all_messages;
315                            this.insert_messages(messages, cx);
316                        });
317                        Ok(())
318                    }
319                    .log_err()
320                })
321                .detach();
322                return true;
323            }
324        }
325        false
326    }
327
328    pub fn rejoin(&mut self, cx: &mut ModelContext<Self>) {
329        let user_store = self.user_store.clone();
330        let rpc = self.rpc.clone();
331        let channel_id = self.details.id;
332        cx.spawn(|this, mut cx| {
333            async move {
334                let response = rpc.request(proto::JoinChannel { channel_id }).await?;
335                let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?;
336                let loaded_all_messages = response.done;
337
338                let pending_messages = this.update(&mut cx, |this, cx| {
339                    if let Some((first_new_message, last_old_message)) =
340                        messages.first().zip(this.messages.last())
341                    {
342                        if first_new_message.id > last_old_message.id {
343                            let old_messages = mem::take(&mut this.messages);
344                            cx.emit(ChannelEvent::MessagesUpdated {
345                                old_range: 0..old_messages.summary().count,
346                                new_count: 0,
347                            });
348                            this.loaded_all_messages = loaded_all_messages;
349                        }
350                    }
351
352                    this.insert_messages(messages, cx);
353                    if loaded_all_messages {
354                        this.loaded_all_messages = loaded_all_messages;
355                    }
356
357                    this.pending_messages().cloned().collect::<Vec<_>>()
358                });
359
360                for pending_message in pending_messages {
361                    let request = rpc.request(proto::SendChannelMessage {
362                        channel_id,
363                        body: pending_message.body,
364                        nonce: Some(pending_message.nonce.into()),
365                    });
366                    let response = request.await?;
367                    let message = ChannelMessage::from_proto(
368                        response.message.ok_or_else(|| anyhow!("invalid message"))?,
369                        &user_store,
370                        &mut cx,
371                    )
372                    .await?;
373                    this.update(&mut cx, |this, cx| {
374                        this.insert_messages(SumTree::from_item(message, &()), cx);
375                    });
376                }
377
378                Ok(())
379            }
380            .log_err()
381        })
382        .detach();
383    }
384
385    pub fn message_count(&self) -> usize {
386        self.messages.summary().count
387    }
388
389    pub fn messages(&self) -> &SumTree<ChannelMessage> {
390        &self.messages
391    }
392
393    pub fn message(&self, ix: usize) -> &ChannelMessage {
394        let mut cursor = self.messages.cursor::<Count>();
395        cursor.seek(&Count(ix), Bias::Right, &());
396        cursor.item().unwrap()
397    }
398
399    pub fn messages_in_range(&self, range: Range<usize>) -> impl Iterator<Item = &ChannelMessage> {
400        let mut cursor = self.messages.cursor::<Count>();
401        cursor.seek(&Count(range.start), Bias::Right, &());
402        cursor.take(range.len())
403    }
404
405    pub fn pending_messages(&self) -> impl Iterator<Item = &ChannelMessage> {
406        let mut cursor = self.messages.cursor::<ChannelMessageId>();
407        cursor.seek(&ChannelMessageId::Pending(0), Bias::Left, &());
408        cursor
409    }
410
411    async fn handle_message_sent(
412        this: ModelHandle<Self>,
413        message: TypedEnvelope<proto::ChannelMessageSent>,
414        _: Arc<Client>,
415        mut cx: AsyncAppContext,
416    ) -> Result<()> {
417        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
418        let message = message
419            .payload
420            .message
421            .ok_or_else(|| anyhow!("empty message"))?;
422
423        let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
424        this.update(&mut cx, |this, cx| {
425            this.insert_messages(SumTree::from_item(message, &()), cx)
426        });
427
428        Ok(())
429    }
430
431    fn insert_messages(&mut self, messages: SumTree<ChannelMessage>, cx: &mut ModelContext<Self>) {
432        if let Some((first_message, last_message)) = messages.first().zip(messages.last()) {
433            let nonces = messages
434                .cursor::<()>()
435                .map(|m| m.nonce)
436                .collect::<HashSet<_>>();
437
438            let mut old_cursor = self.messages.cursor::<(ChannelMessageId, Count)>();
439            let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left, &());
440            let start_ix = old_cursor.start().1 .0;
441            let removed_messages = old_cursor.slice(&last_message.id, Bias::Right, &());
442            let removed_count = removed_messages.summary().count;
443            let new_count = messages.summary().count;
444            let end_ix = start_ix + removed_count;
445
446            new_messages.push_tree(messages, &());
447
448            let mut ranges = Vec::<Range<usize>>::new();
449            if new_messages.last().unwrap().is_pending() {
450                new_messages.push_tree(old_cursor.suffix(&()), &());
451            } else {
452                new_messages.push_tree(
453                    old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left, &()),
454                    &(),
455                );
456
457                while let Some(message) = old_cursor.item() {
458                    let message_ix = old_cursor.start().1 .0;
459                    if nonces.contains(&message.nonce) {
460                        if ranges.last().map_or(false, |r| r.end == message_ix) {
461                            ranges.last_mut().unwrap().end += 1;
462                        } else {
463                            ranges.push(message_ix..message_ix + 1);
464                        }
465                    } else {
466                        new_messages.push(message.clone(), &());
467                    }
468                    old_cursor.next(&());
469                }
470            }
471
472            drop(old_cursor);
473            self.messages = new_messages;
474
475            for range in ranges.into_iter().rev() {
476                cx.emit(ChannelEvent::MessagesUpdated {
477                    old_range: range,
478                    new_count: 0,
479                });
480            }
481            cx.emit(ChannelEvent::MessagesUpdated {
482                old_range: start_ix..end_ix,
483                new_count,
484            });
485            cx.notify();
486        }
487    }
488}
489
490async fn messages_from_proto(
491    proto_messages: Vec<proto::ChannelMessage>,
492    user_store: &ModelHandle<UserStore>,
493    cx: &mut AsyncAppContext,
494) -> Result<SumTree<ChannelMessage>> {
495    let unique_user_ids = proto_messages
496        .iter()
497        .map(|m| m.sender_id)
498        .collect::<HashSet<_>>()
499        .into_iter()
500        .collect();
501    user_store
502        .update(cx, |user_store, cx| {
503            user_store.get_users(unique_user_ids, cx)
504        })
505        .await?;
506
507    let mut messages = Vec::with_capacity(proto_messages.len());
508    for message in proto_messages {
509        messages.push(ChannelMessage::from_proto(message, user_store, cx).await?);
510    }
511    let mut result = SumTree::new();
512    result.extend(messages, &());
513    Ok(result)
514}
515
516impl From<proto::Channel> for ChannelDetails {
517    fn from(message: proto::Channel) -> Self {
518        Self {
519            id: message.id,
520            name: message.name,
521        }
522    }
523}
524
525impl ChannelMessage {
526    pub async fn from_proto(
527        message: proto::ChannelMessage,
528        user_store: &ModelHandle<UserStore>,
529        cx: &mut AsyncAppContext,
530    ) -> Result<Self> {
531        let sender = user_store
532            .update(cx, |user_store, cx| {
533                user_store.get_user(message.sender_id, cx)
534            })
535            .await?;
536        Ok(ChannelMessage {
537            id: ChannelMessageId::Saved(message.id),
538            body: message.body,
539            timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?,
540            sender,
541            nonce: message
542                .nonce
543                .ok_or_else(|| anyhow!("nonce is required"))?
544                .into(),
545        })
546    }
547
548    pub fn is_pending(&self) -> bool {
549        matches!(self.id, ChannelMessageId::Pending(_))
550    }
551}
552
553impl sum_tree::Item for ChannelMessage {
554    type Summary = ChannelMessageSummary;
555
556    fn summary(&self) -> Self::Summary {
557        ChannelMessageSummary {
558            max_id: self.id,
559            count: 1,
560        }
561    }
562}
563
564impl Default for ChannelMessageId {
565    fn default() -> Self {
566        Self::Saved(0)
567    }
568}
569
570impl sum_tree::Summary for ChannelMessageSummary {
571    type Context = ();
572
573    fn add_summary(&mut self, summary: &Self, _: &()) {
574        self.max_id = summary.max_id;
575        self.count += summary.count;
576    }
577}
578
579impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId {
580    fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
581        debug_assert!(summary.max_id > *self);
582        *self = summary.max_id;
583    }
584}
585
586impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count {
587    fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
588        self.0 += summary.count;
589    }
590}
591
592#[cfg(test)]
593mod tests {
594    use super::*;
595    use crate::test::{FakeHttpClient, FakeServer};
596    use gpui::TestAppContext;
597
598    #[gpui::test]
599    async fn test_channel_messages(cx: &mut TestAppContext) {
600        cx.foreground().forbid_parking();
601
602        let user_id = 5;
603        let http_client = FakeHttpClient::with_404_response();
604        let client = cx.update(|cx| Client::new(http_client.clone(), cx));
605        let server = FakeServer::for_client(user_id, &client, cx).await;
606
607        Channel::init(&client);
608        let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
609
610        let channel_list = cx.add_model(|cx| ChannelList::new(user_store, client.clone(), cx));
611        channel_list.read_with(cx, |list, _| assert_eq!(list.available_channels(), None));
612
613        // Get the available channels.
614        let get_channels = server.receive::<proto::GetChannels>().await.unwrap();
615        server
616            .respond(
617                get_channels.receipt(),
618                proto::GetChannelsResponse {
619                    channels: vec![proto::Channel {
620                        id: 5,
621                        name: "the-channel".to_string(),
622                    }],
623                },
624            )
625            .await;
626        channel_list.next_notification(cx).await;
627        channel_list.read_with(cx, |list, _| {
628            assert_eq!(
629                list.available_channels().unwrap(),
630                &[ChannelDetails {
631                    id: 5,
632                    name: "the-channel".into(),
633                }]
634            )
635        });
636
637        let get_users = server.receive::<proto::GetUsers>().await.unwrap();
638        assert_eq!(get_users.payload.user_ids, vec![5]);
639        server
640            .respond(
641                get_users.receipt(),
642                proto::UsersResponse {
643                    users: vec![proto::User {
644                        id: 5,
645                        github_login: "nathansobo".into(),
646                        avatar_url: "http://avatar.com/nathansobo".into(),
647                    }],
648                },
649            )
650            .await;
651
652        // Join a channel and populate its existing messages.
653        let channel = channel_list
654            .update(cx, |list, cx| {
655                let channel_id = list.available_channels().unwrap()[0].id;
656                list.get_channel(channel_id, cx)
657            })
658            .unwrap();
659        channel.read_with(cx, |channel, _| assert!(channel.messages().is_empty()));
660        let join_channel = server.receive::<proto::JoinChannel>().await.unwrap();
661        server
662            .respond(
663                join_channel.receipt(),
664                proto::JoinChannelResponse {
665                    messages: vec![
666                        proto::ChannelMessage {
667                            id: 10,
668                            body: "a".into(),
669                            timestamp: 1000,
670                            sender_id: 5,
671                            nonce: Some(1.into()),
672                        },
673                        proto::ChannelMessage {
674                            id: 11,
675                            body: "b".into(),
676                            timestamp: 1001,
677                            sender_id: 6,
678                            nonce: Some(2.into()),
679                        },
680                    ],
681                    done: false,
682                },
683            )
684            .await;
685
686        // Client requests all users for the received messages
687        let mut get_users = server.receive::<proto::GetUsers>().await.unwrap();
688        get_users.payload.user_ids.sort();
689        assert_eq!(get_users.payload.user_ids, vec![6]);
690        server
691            .respond(
692                get_users.receipt(),
693                proto::UsersResponse {
694                    users: vec![proto::User {
695                        id: 6,
696                        github_login: "maxbrunsfeld".into(),
697                        avatar_url: "http://avatar.com/maxbrunsfeld".into(),
698                    }],
699                },
700            )
701            .await;
702
703        assert_eq!(
704            channel.next_event(cx).await,
705            ChannelEvent::MessagesUpdated {
706                old_range: 0..0,
707                new_count: 2,
708            }
709        );
710        channel.read_with(cx, |channel, _| {
711            assert_eq!(
712                channel
713                    .messages_in_range(0..2)
714                    .map(|message| (message.sender.github_login.clone(), message.body.clone()))
715                    .collect::<Vec<_>>(),
716                &[
717                    ("nathansobo".into(), "a".into()),
718                    ("maxbrunsfeld".into(), "b".into())
719                ]
720            );
721        });
722
723        // Receive a new message.
724        server.send(proto::ChannelMessageSent {
725            channel_id: channel.read_with(cx, |channel, _| channel.details.id),
726            message: Some(proto::ChannelMessage {
727                id: 12,
728                body: "c".into(),
729                timestamp: 1002,
730                sender_id: 7,
731                nonce: Some(3.into()),
732            }),
733        });
734
735        // Client requests user for message since they haven't seen them yet
736        let get_users = server.receive::<proto::GetUsers>().await.unwrap();
737        assert_eq!(get_users.payload.user_ids, vec![7]);
738        server
739            .respond(
740                get_users.receipt(),
741                proto::UsersResponse {
742                    users: vec![proto::User {
743                        id: 7,
744                        github_login: "as-cii".into(),
745                        avatar_url: "http://avatar.com/as-cii".into(),
746                    }],
747                },
748            )
749            .await;
750
751        assert_eq!(
752            channel.next_event(cx).await,
753            ChannelEvent::MessagesUpdated {
754                old_range: 2..2,
755                new_count: 1,
756            }
757        );
758        channel.read_with(cx, |channel, _| {
759            assert_eq!(
760                channel
761                    .messages_in_range(2..3)
762                    .map(|message| (message.sender.github_login.clone(), message.body.clone()))
763                    .collect::<Vec<_>>(),
764                &[("as-cii".into(), "c".into())]
765            )
766        });
767
768        // Scroll up to view older messages.
769        channel.update(cx, |channel, cx| {
770            assert!(channel.load_more_messages(cx));
771        });
772        let get_messages = server.receive::<proto::GetChannelMessages>().await.unwrap();
773        assert_eq!(get_messages.payload.channel_id, 5);
774        assert_eq!(get_messages.payload.before_message_id, 10);
775        server
776            .respond(
777                get_messages.receipt(),
778                proto::GetChannelMessagesResponse {
779                    done: true,
780                    messages: vec![
781                        proto::ChannelMessage {
782                            id: 8,
783                            body: "y".into(),
784                            timestamp: 998,
785                            sender_id: 5,
786                            nonce: Some(4.into()),
787                        },
788                        proto::ChannelMessage {
789                            id: 9,
790                            body: "z".into(),
791                            timestamp: 999,
792                            sender_id: 6,
793                            nonce: Some(5.into()),
794                        },
795                    ],
796                },
797            )
798            .await;
799
800        assert_eq!(
801            channel.next_event(cx).await,
802            ChannelEvent::MessagesUpdated {
803                old_range: 0..0,
804                new_count: 2,
805            }
806        );
807        channel.read_with(cx, |channel, _| {
808            assert_eq!(
809                channel
810                    .messages_in_range(0..2)
811                    .map(|message| (message.sender.github_login.clone(), message.body.clone()))
812                    .collect::<Vec<_>>(),
813                &[
814                    ("nathansobo".into(), "y".into()),
815                    ("maxbrunsfeld".into(), "z".into())
816                ]
817            );
818        });
819    }
820}