channel.rs

  1use super::{
  2    proto,
  3    user::{User, UserStore},
  4    Client, Status, Subscription, TypedEnvelope,
  5};
  6use anyhow::{anyhow, Context, Result};
  7use gpui::{
  8    AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
  9};
 10use postage::prelude::Stream;
 11use rand::prelude::*;
 12use std::{
 13    collections::{HashMap, HashSet},
 14    mem,
 15    ops::Range,
 16    sync::Arc,
 17};
 18use sum_tree::{Bias, SumTree};
 19use time::OffsetDateTime;
 20use util::{post_inc, TryFutureExt};
 21
 22pub struct ChannelList {
 23    available_channels: Option<Vec<ChannelDetails>>,
 24    channels: HashMap<u64, WeakModelHandle<Channel>>,
 25    client: Arc<Client>,
 26    user_store: ModelHandle<UserStore>,
 27    _task: Task<Option<()>>,
 28}
 29
 30#[derive(Clone, Debug, PartialEq)]
 31pub struct ChannelDetails {
 32    pub id: u64,
 33    pub name: String,
 34}
 35
 36pub struct Channel {
 37    details: ChannelDetails,
 38    messages: SumTree<ChannelMessage>,
 39    loaded_all_messages: bool,
 40    next_pending_message_id: usize,
 41    user_store: ModelHandle<UserStore>,
 42    rpc: Arc<Client>,
 43    rng: StdRng,
 44    _subscription: Subscription,
 45}
 46
 47#[derive(Clone, Debug)]
 48pub struct ChannelMessage {
 49    pub id: ChannelMessageId,
 50    pub body: String,
 51    pub timestamp: OffsetDateTime,
 52    pub sender: Arc<User>,
 53    pub nonce: u128,
 54}
 55
 56#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
 57pub enum ChannelMessageId {
 58    Saved(u64),
 59    Pending(usize),
 60}
 61
 62#[derive(Clone, Debug, Default)]
 63pub struct ChannelMessageSummary {
 64    max_id: ChannelMessageId,
 65    count: usize,
 66}
 67
 68#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
 69struct Count(usize);
 70
 71pub enum ChannelListEvent {}
 72
 73#[derive(Clone, Debug, PartialEq)]
 74pub enum ChannelEvent {
 75    MessagesUpdated {
 76        old_range: Range<usize>,
 77        new_count: usize,
 78    },
 79}
 80
 81impl Entity for ChannelList {
 82    type Event = ChannelListEvent;
 83}
 84
 85impl ChannelList {
 86    pub fn new(
 87        user_store: ModelHandle<UserStore>,
 88        rpc: Arc<Client>,
 89        cx: &mut ModelContext<Self>,
 90    ) -> Self {
 91        let _task = cx.spawn_weak(|this, mut cx| {
 92            let rpc = rpc.clone();
 93            async move {
 94                let mut status = rpc.status();
 95                while let Some((status, this)) = status.recv().await.zip(this.upgrade(&cx)) {
 96                    match status {
 97                        Status::Connected { .. } => {
 98                            let response = rpc
 99                                .request(proto::GetChannels {})
100                                .await
101                                .context("failed to fetch available channels")?;
102                            this.update(&mut cx, |this, cx| {
103                                this.available_channels =
104                                    Some(response.channels.into_iter().map(Into::into).collect());
105
106                                let mut to_remove = Vec::new();
107                                for (channel_id, channel) in &this.channels {
108                                    if let Some(channel) = channel.upgrade(cx) {
109                                        channel.update(cx, |channel, cx| channel.rejoin(cx))
110                                    } else {
111                                        to_remove.push(*channel_id);
112                                    }
113                                }
114
115                                for channel_id in to_remove {
116                                    this.channels.remove(&channel_id);
117                                }
118                                cx.notify();
119                            });
120                        }
121                        Status::SignedOut { .. } => {
122                            this.update(&mut cx, |this, cx| {
123                                this.available_channels = None;
124                                this.channels.clear();
125                                cx.notify();
126                            });
127                        }
128                        _ => {}
129                    }
130                }
131                Ok(())
132            }
133            .log_err()
134        });
135
136        Self {
137            available_channels: None,
138            channels: Default::default(),
139            user_store,
140            client: rpc,
141            _task,
142        }
143    }
144
145    pub fn available_channels(&self) -> Option<&[ChannelDetails]> {
146        self.available_channels.as_ref().map(Vec::as_slice)
147    }
148
149    pub fn get_channel(
150        &mut self,
151        id: u64,
152        cx: &mut MutableAppContext,
153    ) -> Option<ModelHandle<Channel>> {
154        if let Some(channel) = self.channels.get(&id).and_then(|c| c.upgrade(cx)) {
155            return Some(channel);
156        }
157
158        let channels = self.available_channels.as_ref()?;
159        let details = channels.iter().find(|details| details.id == id)?.clone();
160        let channel = cx.add_model(|cx| {
161            Channel::new(details, self.user_store.clone(), self.client.clone(), cx)
162        });
163        self.channels.insert(id, channel.downgrade());
164        Some(channel)
165    }
166}
167
168impl Entity for Channel {
169    type Event = ChannelEvent;
170
171    fn release(&mut self, cx: &mut MutableAppContext) {
172        let rpc = self.rpc.clone();
173        let channel_id = self.details.id;
174        cx.foreground()
175            .spawn(async move {
176                if let Err(error) = rpc.send(proto::LeaveChannel { channel_id }).await {
177                    log::error!("error leaving channel: {}", error);
178                };
179            })
180            .detach()
181    }
182}
183
184impl Channel {
185    pub fn new(
186        details: ChannelDetails,
187        user_store: ModelHandle<UserStore>,
188        rpc: Arc<Client>,
189        cx: &mut ModelContext<Self>,
190    ) -> Self {
191        let _subscription = rpc.subscribe_to_entity(details.id, cx, Self::handle_message_sent);
192
193        {
194            let user_store = user_store.clone();
195            let rpc = rpc.clone();
196            let channel_id = details.id;
197            cx.spawn(|channel, mut cx| {
198                async move {
199                    let response = rpc.request(proto::JoinChannel { channel_id }).await?;
200                    let messages =
201                        messages_from_proto(response.messages, &user_store, &mut cx).await?;
202                    let loaded_all_messages = response.done;
203
204                    channel.update(&mut cx, |channel, cx| {
205                        channel.insert_messages(messages, cx);
206                        channel.loaded_all_messages = loaded_all_messages;
207                    });
208
209                    Ok(())
210                }
211                .log_err()
212            })
213            .detach();
214        }
215
216        Self {
217            details,
218            user_store,
219            rpc,
220            messages: Default::default(),
221            loaded_all_messages: false,
222            next_pending_message_id: 0,
223            rng: StdRng::from_entropy(),
224            _subscription,
225        }
226    }
227
228    pub fn name(&self) -> &str {
229        &self.details.name
230    }
231
232    pub fn send_message(
233        &mut self,
234        body: String,
235        cx: &mut ModelContext<Self>,
236    ) -> Result<Task<Result<()>>> {
237        if body.is_empty() {
238            Err(anyhow!("message body can't be empty"))?;
239        }
240
241        let current_user = self
242            .user_store
243            .read(cx)
244            .current_user()
245            .ok_or_else(|| anyhow!("current_user is not present"))?;
246
247        let channel_id = self.details.id;
248        let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id));
249        let nonce = self.rng.gen();
250        self.insert_messages(
251            SumTree::from_item(
252                ChannelMessage {
253                    id: pending_id,
254                    body: body.clone(),
255                    sender: current_user,
256                    timestamp: OffsetDateTime::now_utc(),
257                    nonce,
258                },
259                &(),
260            ),
261            cx,
262        );
263        let user_store = self.user_store.clone();
264        let rpc = self.rpc.clone();
265        Ok(cx.spawn(|this, mut cx| async move {
266            let request = rpc.request(proto::SendChannelMessage {
267                channel_id,
268                body,
269                nonce: Some(nonce.into()),
270            });
271            let response = request.await?;
272            let message = ChannelMessage::from_proto(
273                response.message.ok_or_else(|| anyhow!("invalid message"))?,
274                &user_store,
275                &mut cx,
276            )
277            .await?;
278            this.update(&mut cx, |this, cx| {
279                this.insert_messages(SumTree::from_item(message, &()), cx);
280                Ok(())
281            })
282        }))
283    }
284
285    pub fn load_more_messages(&mut self, cx: &mut ModelContext<Self>) -> bool {
286        if !self.loaded_all_messages {
287            let rpc = self.rpc.clone();
288            let user_store = self.user_store.clone();
289            let channel_id = self.details.id;
290            if let Some(before_message_id) =
291                self.messages.first().and_then(|message| match message.id {
292                    ChannelMessageId::Saved(id) => Some(id),
293                    ChannelMessageId::Pending(_) => None,
294                })
295            {
296                cx.spawn(|this, mut cx| {
297                    async move {
298                        let response = rpc
299                            .request(proto::GetChannelMessages {
300                                channel_id,
301                                before_message_id,
302                            })
303                            .await?;
304                        let loaded_all_messages = response.done;
305                        let messages =
306                            messages_from_proto(response.messages, &user_store, &mut cx).await?;
307                        this.update(&mut cx, |this, cx| {
308                            this.loaded_all_messages = loaded_all_messages;
309                            this.insert_messages(messages, cx);
310                        });
311                        Ok(())
312                    }
313                    .log_err()
314                })
315                .detach();
316                return true;
317            }
318        }
319        false
320    }
321
322    pub fn rejoin(&mut self, cx: &mut ModelContext<Self>) {
323        let user_store = self.user_store.clone();
324        let rpc = self.rpc.clone();
325        let channel_id = self.details.id;
326        cx.spawn(|this, mut cx| {
327            async move {
328                let response = rpc.request(proto::JoinChannel { channel_id }).await?;
329                let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?;
330                let loaded_all_messages = response.done;
331
332                let pending_messages = this.update(&mut cx, |this, cx| {
333                    if let Some((first_new_message, last_old_message)) =
334                        messages.first().zip(this.messages.last())
335                    {
336                        if first_new_message.id > last_old_message.id {
337                            let old_messages = mem::take(&mut this.messages);
338                            cx.emit(ChannelEvent::MessagesUpdated {
339                                old_range: 0..old_messages.summary().count,
340                                new_count: 0,
341                            });
342                            this.loaded_all_messages = loaded_all_messages;
343                        }
344                    }
345
346                    this.insert_messages(messages, cx);
347                    if loaded_all_messages {
348                        this.loaded_all_messages = loaded_all_messages;
349                    }
350
351                    this.pending_messages().cloned().collect::<Vec<_>>()
352                });
353
354                for pending_message in pending_messages {
355                    let request = rpc.request(proto::SendChannelMessage {
356                        channel_id,
357                        body: pending_message.body,
358                        nonce: Some(pending_message.nonce.into()),
359                    });
360                    let response = request.await?;
361                    let message = ChannelMessage::from_proto(
362                        response.message.ok_or_else(|| anyhow!("invalid message"))?,
363                        &user_store,
364                        &mut cx,
365                    )
366                    .await?;
367                    this.update(&mut cx, |this, cx| {
368                        this.insert_messages(SumTree::from_item(message, &()), cx);
369                    });
370                }
371
372                Ok(())
373            }
374            .log_err()
375        })
376        .detach();
377    }
378
379    pub fn message_count(&self) -> usize {
380        self.messages.summary().count
381    }
382
383    pub fn messages(&self) -> &SumTree<ChannelMessage> {
384        &self.messages
385    }
386
387    pub fn message(&self, ix: usize) -> &ChannelMessage {
388        let mut cursor = self.messages.cursor::<Count>();
389        cursor.seek(&Count(ix), Bias::Right, &());
390        cursor.item().unwrap()
391    }
392
393    pub fn messages_in_range(&self, range: Range<usize>) -> impl Iterator<Item = &ChannelMessage> {
394        let mut cursor = self.messages.cursor::<Count>();
395        cursor.seek(&Count(range.start), Bias::Right, &());
396        cursor.take(range.len())
397    }
398
399    pub fn pending_messages(&self) -> impl Iterator<Item = &ChannelMessage> {
400        let mut cursor = self.messages.cursor::<ChannelMessageId>();
401        cursor.seek(&ChannelMessageId::Pending(0), Bias::Left, &());
402        cursor
403    }
404
405    fn handle_message_sent(
406        &mut self,
407        message: TypedEnvelope<proto::ChannelMessageSent>,
408        _: Arc<Client>,
409        cx: &mut ModelContext<Self>,
410    ) -> Result<()> {
411        let user_store = self.user_store.clone();
412        let message = message
413            .payload
414            .message
415            .ok_or_else(|| anyhow!("empty message"))?;
416
417        cx.spawn(|this, mut cx| {
418            async move {
419                let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
420                this.update(&mut cx, |this, cx| {
421                    this.insert_messages(SumTree::from_item(message, &()), cx)
422                });
423                Ok(())
424            }
425            .log_err()
426        })
427        .detach();
428        Ok(())
429    }
430
431    fn insert_messages(&mut self, messages: SumTree<ChannelMessage>, cx: &mut ModelContext<Self>) {
432        if let Some((first_message, last_message)) = messages.first().zip(messages.last()) {
433            let nonces = messages
434                .cursor::<()>()
435                .map(|m| m.nonce)
436                .collect::<HashSet<_>>();
437
438            let mut old_cursor = self.messages.cursor::<(ChannelMessageId, Count)>();
439            let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left, &());
440            let start_ix = old_cursor.start().1 .0;
441            let removed_messages = old_cursor.slice(&last_message.id, Bias::Right, &());
442            let removed_count = removed_messages.summary().count;
443            let new_count = messages.summary().count;
444            let end_ix = start_ix + removed_count;
445
446            new_messages.push_tree(messages, &());
447
448            let mut ranges = Vec::<Range<usize>>::new();
449            if new_messages.last().unwrap().is_pending() {
450                new_messages.push_tree(old_cursor.suffix(&()), &());
451            } else {
452                new_messages.push_tree(
453                    old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left, &()),
454                    &(),
455                );
456
457                while let Some(message) = old_cursor.item() {
458                    let message_ix = old_cursor.start().1 .0;
459                    if nonces.contains(&message.nonce) {
460                        if ranges.last().map_or(false, |r| r.end == message_ix) {
461                            ranges.last_mut().unwrap().end += 1;
462                        } else {
463                            ranges.push(message_ix..message_ix + 1);
464                        }
465                    } else {
466                        new_messages.push(message.clone(), &());
467                    }
468                    old_cursor.next(&());
469                }
470            }
471
472            drop(old_cursor);
473            self.messages = new_messages;
474
475            for range in ranges.into_iter().rev() {
476                cx.emit(ChannelEvent::MessagesUpdated {
477                    old_range: range,
478                    new_count: 0,
479                });
480            }
481            cx.emit(ChannelEvent::MessagesUpdated {
482                old_range: start_ix..end_ix,
483                new_count,
484            });
485            cx.notify();
486        }
487    }
488}
489
490async fn messages_from_proto(
491    proto_messages: Vec<proto::ChannelMessage>,
492    user_store: &ModelHandle<UserStore>,
493    cx: &mut AsyncAppContext,
494) -> Result<SumTree<ChannelMessage>> {
495    let unique_user_ids = proto_messages
496        .iter()
497        .map(|m| m.sender_id)
498        .collect::<HashSet<_>>()
499        .into_iter()
500        .collect();
501    user_store
502        .update(cx, |user_store, cx| {
503            user_store.load_users(unique_user_ids, cx)
504        })
505        .await?;
506
507    let mut messages = Vec::with_capacity(proto_messages.len());
508    for message in proto_messages {
509        messages.push(ChannelMessage::from_proto(message, user_store, cx).await?);
510    }
511    let mut result = SumTree::new();
512    result.extend(messages, &());
513    Ok(result)
514}
515
516impl From<proto::Channel> for ChannelDetails {
517    fn from(message: proto::Channel) -> Self {
518        Self {
519            id: message.id,
520            name: message.name,
521        }
522    }
523}
524
525impl ChannelMessage {
526    pub async fn from_proto(
527        message: proto::ChannelMessage,
528        user_store: &ModelHandle<UserStore>,
529        cx: &mut AsyncAppContext,
530    ) -> Result<Self> {
531        let sender = user_store
532            .update(cx, |user_store, cx| {
533                user_store.fetch_user(message.sender_id, cx)
534            })
535            .await?;
536        Ok(ChannelMessage {
537            id: ChannelMessageId::Saved(message.id),
538            body: message.body,
539            timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?,
540            sender,
541            nonce: message
542                .nonce
543                .ok_or_else(|| anyhow!("nonce is required"))?
544                .into(),
545        })
546    }
547
548    pub fn is_pending(&self) -> bool {
549        matches!(self.id, ChannelMessageId::Pending(_))
550    }
551}
552
553impl sum_tree::Item for ChannelMessage {
554    type Summary = ChannelMessageSummary;
555
556    fn summary(&self) -> Self::Summary {
557        ChannelMessageSummary {
558            max_id: self.id,
559            count: 1,
560        }
561    }
562}
563
564impl Default for ChannelMessageId {
565    fn default() -> Self {
566        Self::Saved(0)
567    }
568}
569
570impl sum_tree::Summary for ChannelMessageSummary {
571    type Context = ();
572
573    fn add_summary(&mut self, summary: &Self, _: &()) {
574        self.max_id = summary.max_id;
575        self.count += summary.count;
576    }
577}
578
579impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId {
580    fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
581        debug_assert!(summary.max_id > *self);
582        *self = summary.max_id;
583    }
584}
585
586impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count {
587    fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
588        self.0 += summary.count;
589    }
590}
591
592#[cfg(test)]
593mod tests {
594    use super::*;
595    use crate::test::{FakeHttpClient, FakeServer};
596    use gpui::TestAppContext;
597    use surf::http::Response;
598
599    #[gpui::test]
600    async fn test_channel_messages(mut cx: TestAppContext) {
601        let user_id = 5;
602        let mut client = Client::new();
603        let http_client = FakeHttpClient::new(|_| async move { Ok(Response::new(404)) });
604        let server = FakeServer::for_client(user_id, &mut client, &cx).await;
605        let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
606
607        let channel_list = cx.add_model(|cx| ChannelList::new(user_store, client.clone(), cx));
608        channel_list.read_with(&cx, |list, _| assert_eq!(list.available_channels(), None));
609
610        // Get the available channels.
611        let get_channels = server.receive::<proto::GetChannels>().await.unwrap();
612        server
613            .respond(
614                get_channels.receipt(),
615                proto::GetChannelsResponse {
616                    channels: vec![proto::Channel {
617                        id: 5,
618                        name: "the-channel".to_string(),
619                    }],
620                },
621            )
622            .await;
623        channel_list.next_notification(&cx).await;
624        channel_list.read_with(&cx, |list, _| {
625            assert_eq!(
626                list.available_channels().unwrap(),
627                &[ChannelDetails {
628                    id: 5,
629                    name: "the-channel".into(),
630                }]
631            )
632        });
633
634        let get_users = server.receive::<proto::GetUsers>().await.unwrap();
635        assert_eq!(get_users.payload.user_ids, vec![5]);
636        server
637            .respond(
638                get_users.receipt(),
639                proto::GetUsersResponse {
640                    users: vec![proto::User {
641                        id: 5,
642                        github_login: "nathansobo".into(),
643                        avatar_url: "http://avatar.com/nathansobo".into(),
644                    }],
645                },
646            )
647            .await;
648
649        // Join a channel and populate its existing messages.
650        let channel = channel_list
651            .update(&mut cx, |list, cx| {
652                let channel_id = list.available_channels().unwrap()[0].id;
653                list.get_channel(channel_id, cx)
654            })
655            .unwrap();
656        channel.read_with(&cx, |channel, _| assert!(channel.messages().is_empty()));
657        let join_channel = server.receive::<proto::JoinChannel>().await.unwrap();
658        server
659            .respond(
660                join_channel.receipt(),
661                proto::JoinChannelResponse {
662                    messages: vec![
663                        proto::ChannelMessage {
664                            id: 10,
665                            body: "a".into(),
666                            timestamp: 1000,
667                            sender_id: 5,
668                            nonce: Some(1.into()),
669                        },
670                        proto::ChannelMessage {
671                            id: 11,
672                            body: "b".into(),
673                            timestamp: 1001,
674                            sender_id: 6,
675                            nonce: Some(2.into()),
676                        },
677                    ],
678                    done: false,
679                },
680            )
681            .await;
682
683        // Client requests all users for the received messages
684        let mut get_users = server.receive::<proto::GetUsers>().await.unwrap();
685        get_users.payload.user_ids.sort();
686        assert_eq!(get_users.payload.user_ids, vec![6]);
687        server
688            .respond(
689                get_users.receipt(),
690                proto::GetUsersResponse {
691                    users: vec![proto::User {
692                        id: 6,
693                        github_login: "maxbrunsfeld".into(),
694                        avatar_url: "http://avatar.com/maxbrunsfeld".into(),
695                    }],
696                },
697            )
698            .await;
699
700        assert_eq!(
701            channel.next_event(&cx).await,
702            ChannelEvent::MessagesUpdated {
703                old_range: 0..0,
704                new_count: 2,
705            }
706        );
707        channel.read_with(&cx, |channel, _| {
708            assert_eq!(
709                channel
710                    .messages_in_range(0..2)
711                    .map(|message| (message.sender.github_login.clone(), message.body.clone()))
712                    .collect::<Vec<_>>(),
713                &[
714                    ("nathansobo".into(), "a".into()),
715                    ("maxbrunsfeld".into(), "b".into())
716                ]
717            );
718        });
719
720        // Receive a new message.
721        server
722            .send(proto::ChannelMessageSent {
723                channel_id: channel.read_with(&cx, |channel, _| channel.details.id),
724                message: Some(proto::ChannelMessage {
725                    id: 12,
726                    body: "c".into(),
727                    timestamp: 1002,
728                    sender_id: 7,
729                    nonce: Some(3.into()),
730                }),
731            })
732            .await;
733
734        // Client requests user for message since they haven't seen them yet
735        let get_users = server.receive::<proto::GetUsers>().await.unwrap();
736        assert_eq!(get_users.payload.user_ids, vec![7]);
737        server
738            .respond(
739                get_users.receipt(),
740                proto::GetUsersResponse {
741                    users: vec![proto::User {
742                        id: 7,
743                        github_login: "as-cii".into(),
744                        avatar_url: "http://avatar.com/as-cii".into(),
745                    }],
746                },
747            )
748            .await;
749
750        assert_eq!(
751            channel.next_event(&cx).await,
752            ChannelEvent::MessagesUpdated {
753                old_range: 2..2,
754                new_count: 1,
755            }
756        );
757        channel.read_with(&cx, |channel, _| {
758            assert_eq!(
759                channel
760                    .messages_in_range(2..3)
761                    .map(|message| (message.sender.github_login.clone(), message.body.clone()))
762                    .collect::<Vec<_>>(),
763                &[("as-cii".into(), "c".into())]
764            )
765        });
766
767        // Scroll up to view older messages.
768        channel.update(&mut cx, |channel, cx| {
769            assert!(channel.load_more_messages(cx));
770        });
771        let get_messages = server.receive::<proto::GetChannelMessages>().await.unwrap();
772        assert_eq!(get_messages.payload.channel_id, 5);
773        assert_eq!(get_messages.payload.before_message_id, 10);
774        server
775            .respond(
776                get_messages.receipt(),
777                proto::GetChannelMessagesResponse {
778                    done: true,
779                    messages: vec![
780                        proto::ChannelMessage {
781                            id: 8,
782                            body: "y".into(),
783                            timestamp: 998,
784                            sender_id: 5,
785                            nonce: Some(4.into()),
786                        },
787                        proto::ChannelMessage {
788                            id: 9,
789                            body: "z".into(),
790                            timestamp: 999,
791                            sender_id: 6,
792                            nonce: Some(5.into()),
793                        },
794                    ],
795                },
796            )
797            .await;
798
799        assert_eq!(
800            channel.next_event(&cx).await,
801            ChannelEvent::MessagesUpdated {
802                old_range: 0..0,
803                new_count: 2,
804            }
805        );
806        channel.read_with(&cx, |channel, _| {
807            assert_eq!(
808                channel
809                    .messages_in_range(0..2)
810                    .map(|message| (message.sender.github_login.clone(), message.body.clone()))
811                    .collect::<Vec<_>>(),
812                &[
813                    ("nathansobo".into(), "y".into()),
814                    ("maxbrunsfeld".into(), "z".into())
815                ]
816            );
817        });
818    }
819}