channel.rs

  1use super::{
  2    proto,
  3    user::{User, UserStore},
  4    Client, Status, Subscription, TypedEnvelope,
  5};
  6use anyhow::{anyhow, Context, Result};
  7use futures::lock::Mutex;
  8use gpui::{
  9    AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
 10};
 11use postage::prelude::Stream;
 12use rand::prelude::*;
 13use std::{
 14    collections::{HashMap, HashSet},
 15    mem,
 16    ops::Range,
 17    sync::Arc,
 18};
 19use sum_tree::{Bias, SumTree};
 20use time::OffsetDateTime;
 21use util::{post_inc, ResultExt as _, TryFutureExt};
 22
 23pub struct ChannelList {
 24    available_channels: Option<Vec<ChannelDetails>>,
 25    channels: HashMap<u64, WeakModelHandle<Channel>>,
 26    client: Arc<Client>,
 27    user_store: ModelHandle<UserStore>,
 28    _task: Task<Option<()>>,
 29}
 30
 31#[derive(Clone, Debug, PartialEq)]
 32pub struct ChannelDetails {
 33    pub id: u64,
 34    pub name: String,
 35}
 36
 37pub struct Channel {
 38    details: ChannelDetails,
 39    messages: SumTree<ChannelMessage>,
 40    loaded_all_messages: bool,
 41    next_pending_message_id: usize,
 42    user_store: ModelHandle<UserStore>,
 43    rpc: Arc<Client>,
 44    outgoing_messages_lock: Arc<Mutex<()>>,
 45    rng: StdRng,
 46    _subscription: Subscription,
 47}
 48
 49#[derive(Clone, Debug)]
 50pub struct ChannelMessage {
 51    pub id: ChannelMessageId,
 52    pub body: String,
 53    pub timestamp: OffsetDateTime,
 54    pub sender: Arc<User>,
 55    pub nonce: u128,
 56}
 57
 58#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
 59pub enum ChannelMessageId {
 60    Saved(u64),
 61    Pending(usize),
 62}
 63
 64#[derive(Clone, Debug, Default)]
 65pub struct ChannelMessageSummary {
 66    max_id: ChannelMessageId,
 67    count: usize,
 68}
 69
 70#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
 71struct Count(usize);
 72
 73pub enum ChannelListEvent {}
 74
 75#[derive(Clone, Debug, PartialEq)]
 76pub enum ChannelEvent {
 77    MessagesUpdated {
 78        old_range: Range<usize>,
 79        new_count: usize,
 80    },
 81}
 82
 83impl Entity for ChannelList {
 84    type Event = ChannelListEvent;
 85}
 86
 87impl ChannelList {
 88    pub fn new(
 89        user_store: ModelHandle<UserStore>,
 90        rpc: Arc<Client>,
 91        cx: &mut ModelContext<Self>,
 92    ) -> Self {
 93        let _task = cx.spawn_weak(|this, mut cx| {
 94            let rpc = rpc.clone();
 95            async move {
 96                let mut status = rpc.status();
 97                while let Some((status, this)) = status.recv().await.zip(this.upgrade(&cx)) {
 98                    match status {
 99                        Status::Connected { .. } => {
100                            let response = rpc
101                                .request(proto::GetChannels {})
102                                .await
103                                .context("failed to fetch available channels")?;
104                            this.update(&mut cx, |this, cx| {
105                                this.available_channels =
106                                    Some(response.channels.into_iter().map(Into::into).collect());
107
108                                let mut to_remove = Vec::new();
109                                for (channel_id, channel) in &this.channels {
110                                    if let Some(channel) = channel.upgrade(cx) {
111                                        channel.update(cx, |channel, cx| channel.rejoin(cx))
112                                    } else {
113                                        to_remove.push(*channel_id);
114                                    }
115                                }
116
117                                for channel_id in to_remove {
118                                    this.channels.remove(&channel_id);
119                                }
120                                cx.notify();
121                            });
122                        }
123                        Status::SignedOut { .. } => {
124                            this.update(&mut cx, |this, cx| {
125                                this.available_channels = None;
126                                this.channels.clear();
127                                cx.notify();
128                            });
129                        }
130                        _ => {}
131                    }
132                }
133                Ok(())
134            }
135            .log_err()
136        });
137
138        Self {
139            available_channels: None,
140            channels: Default::default(),
141            user_store,
142            client: rpc,
143            _task,
144        }
145    }
146
147    pub fn available_channels(&self) -> Option<&[ChannelDetails]> {
148        self.available_channels.as_ref().map(Vec::as_slice)
149    }
150
151    pub fn get_channel(
152        &mut self,
153        id: u64,
154        cx: &mut MutableAppContext,
155    ) -> Option<ModelHandle<Channel>> {
156        if let Some(channel) = self.channels.get(&id).and_then(|c| c.upgrade(cx)) {
157            return Some(channel);
158        }
159
160        let channels = self.available_channels.as_ref()?;
161        let details = channels.iter().find(|details| details.id == id)?.clone();
162        let channel = cx.add_model(|cx| {
163            Channel::new(details, self.user_store.clone(), self.client.clone(), cx)
164        });
165        self.channels.insert(id, channel.downgrade());
166        Some(channel)
167    }
168}
169
170impl Entity for Channel {
171    type Event = ChannelEvent;
172
173    fn release(&mut self, _: &mut MutableAppContext) {
174        self.rpc
175            .send(proto::LeaveChannel {
176                channel_id: self.details.id,
177            })
178            .log_err();
179    }
180}
181
182impl Channel {
183    pub fn new(
184        details: ChannelDetails,
185        user_store: ModelHandle<UserStore>,
186        rpc: Arc<Client>,
187        cx: &mut ModelContext<Self>,
188    ) -> Self {
189        let _subscription =
190            rpc.add_entity_message_handler(details.id, cx, Self::handle_message_sent);
191
192        {
193            let user_store = user_store.clone();
194            let rpc = rpc.clone();
195            let channel_id = details.id;
196            cx.spawn(|channel, mut cx| {
197                async move {
198                    let response = rpc.request(proto::JoinChannel { channel_id }).await?;
199                    let messages =
200                        messages_from_proto(response.messages, &user_store, &mut cx).await?;
201                    let loaded_all_messages = response.done;
202
203                    channel.update(&mut cx, |channel, cx| {
204                        channel.insert_messages(messages, cx);
205                        channel.loaded_all_messages = loaded_all_messages;
206                    });
207
208                    Ok(())
209                }
210                .log_err()
211            })
212            .detach();
213        }
214
215        Self {
216            details,
217            user_store,
218            rpc,
219            outgoing_messages_lock: Default::default(),
220            messages: Default::default(),
221            loaded_all_messages: false,
222            next_pending_message_id: 0,
223            rng: StdRng::from_entropy(),
224            _subscription,
225        }
226    }
227
228    pub fn name(&self) -> &str {
229        &self.details.name
230    }
231
232    pub fn send_message(
233        &mut self,
234        body: String,
235        cx: &mut ModelContext<Self>,
236    ) -> Result<Task<Result<()>>> {
237        if body.is_empty() {
238            Err(anyhow!("message body can't be empty"))?;
239        }
240
241        let current_user = self
242            .user_store
243            .read(cx)
244            .current_user()
245            .ok_or_else(|| anyhow!("current_user is not present"))?;
246
247        let channel_id = self.details.id;
248        let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id));
249        let nonce = self.rng.gen();
250        self.insert_messages(
251            SumTree::from_item(
252                ChannelMessage {
253                    id: pending_id,
254                    body: body.clone(),
255                    sender: current_user,
256                    timestamp: OffsetDateTime::now_utc(),
257                    nonce,
258                },
259                &(),
260            ),
261            cx,
262        );
263        let user_store = self.user_store.clone();
264        let rpc = self.rpc.clone();
265        let outgoing_messages_lock = self.outgoing_messages_lock.clone();
266        Ok(cx.spawn(|this, mut cx| async move {
267            let outgoing_message_guard = outgoing_messages_lock.lock().await;
268            let request = rpc.request(proto::SendChannelMessage {
269                channel_id,
270                body,
271                nonce: Some(nonce.into()),
272            });
273            let response = request.await?;
274            drop(outgoing_message_guard);
275            let message = ChannelMessage::from_proto(
276                response.message.ok_or_else(|| anyhow!("invalid message"))?,
277                &user_store,
278                &mut cx,
279            )
280            .await?;
281            this.update(&mut cx, |this, cx| {
282                this.insert_messages(SumTree::from_item(message, &()), cx);
283                Ok(())
284            })
285        }))
286    }
287
288    pub fn load_more_messages(&mut self, cx: &mut ModelContext<Self>) -> bool {
289        if !self.loaded_all_messages {
290            let rpc = self.rpc.clone();
291            let user_store = self.user_store.clone();
292            let channel_id = self.details.id;
293            if let Some(before_message_id) =
294                self.messages.first().and_then(|message| match message.id {
295                    ChannelMessageId::Saved(id) => Some(id),
296                    ChannelMessageId::Pending(_) => None,
297                })
298            {
299                cx.spawn(|this, mut cx| {
300                    async move {
301                        let response = rpc
302                            .request(proto::GetChannelMessages {
303                                channel_id,
304                                before_message_id,
305                            })
306                            .await?;
307                        let loaded_all_messages = response.done;
308                        let messages =
309                            messages_from_proto(response.messages, &user_store, &mut cx).await?;
310                        this.update(&mut cx, |this, cx| {
311                            this.loaded_all_messages = loaded_all_messages;
312                            this.insert_messages(messages, cx);
313                        });
314                        Ok(())
315                    }
316                    .log_err()
317                })
318                .detach();
319                return true;
320            }
321        }
322        false
323    }
324
325    pub fn rejoin(&mut self, cx: &mut ModelContext<Self>) {
326        let user_store = self.user_store.clone();
327        let rpc = self.rpc.clone();
328        let channel_id = self.details.id;
329        cx.spawn(|this, mut cx| {
330            async move {
331                let response = rpc.request(proto::JoinChannel { channel_id }).await?;
332                let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?;
333                let loaded_all_messages = response.done;
334
335                let pending_messages = this.update(&mut cx, |this, cx| {
336                    if let Some((first_new_message, last_old_message)) =
337                        messages.first().zip(this.messages.last())
338                    {
339                        if first_new_message.id > last_old_message.id {
340                            let old_messages = mem::take(&mut this.messages);
341                            cx.emit(ChannelEvent::MessagesUpdated {
342                                old_range: 0..old_messages.summary().count,
343                                new_count: 0,
344                            });
345                            this.loaded_all_messages = loaded_all_messages;
346                        }
347                    }
348
349                    this.insert_messages(messages, cx);
350                    if loaded_all_messages {
351                        this.loaded_all_messages = loaded_all_messages;
352                    }
353
354                    this.pending_messages().cloned().collect::<Vec<_>>()
355                });
356
357                for pending_message in pending_messages {
358                    let request = rpc.request(proto::SendChannelMessage {
359                        channel_id,
360                        body: pending_message.body,
361                        nonce: Some(pending_message.nonce.into()),
362                    });
363                    let response = request.await?;
364                    let message = ChannelMessage::from_proto(
365                        response.message.ok_or_else(|| anyhow!("invalid message"))?,
366                        &user_store,
367                        &mut cx,
368                    )
369                    .await?;
370                    this.update(&mut cx, |this, cx| {
371                        this.insert_messages(SumTree::from_item(message, &()), cx);
372                    });
373                }
374
375                Ok(())
376            }
377            .log_err()
378        })
379        .detach();
380    }
381
382    pub fn message_count(&self) -> usize {
383        self.messages.summary().count
384    }
385
386    pub fn messages(&self) -> &SumTree<ChannelMessage> {
387        &self.messages
388    }
389
390    pub fn message(&self, ix: usize) -> &ChannelMessage {
391        let mut cursor = self.messages.cursor::<Count>();
392        cursor.seek(&Count(ix), Bias::Right, &());
393        cursor.item().unwrap()
394    }
395
396    pub fn messages_in_range(&self, range: Range<usize>) -> impl Iterator<Item = &ChannelMessage> {
397        let mut cursor = self.messages.cursor::<Count>();
398        cursor.seek(&Count(range.start), Bias::Right, &());
399        cursor.take(range.len())
400    }
401
402    pub fn pending_messages(&self) -> impl Iterator<Item = &ChannelMessage> {
403        let mut cursor = self.messages.cursor::<ChannelMessageId>();
404        cursor.seek(&ChannelMessageId::Pending(0), Bias::Left, &());
405        cursor
406    }
407
408    async fn handle_message_sent(
409        this: ModelHandle<Self>,
410        message: TypedEnvelope<proto::ChannelMessageSent>,
411        _: Arc<Client>,
412        mut cx: AsyncAppContext,
413    ) -> Result<()> {
414        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
415        let message = message
416            .payload
417            .message
418            .ok_or_else(|| anyhow!("empty message"))?;
419
420        let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
421        this.update(&mut cx, |this, cx| {
422            this.insert_messages(SumTree::from_item(message, &()), cx)
423        });
424
425        Ok(())
426    }
427
428    fn insert_messages(&mut self, messages: SumTree<ChannelMessage>, cx: &mut ModelContext<Self>) {
429        if let Some((first_message, last_message)) = messages.first().zip(messages.last()) {
430            let nonces = messages
431                .cursor::<()>()
432                .map(|m| m.nonce)
433                .collect::<HashSet<_>>();
434
435            let mut old_cursor = self.messages.cursor::<(ChannelMessageId, Count)>();
436            let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left, &());
437            let start_ix = old_cursor.start().1 .0;
438            let removed_messages = old_cursor.slice(&last_message.id, Bias::Right, &());
439            let removed_count = removed_messages.summary().count;
440            let new_count = messages.summary().count;
441            let end_ix = start_ix + removed_count;
442
443            new_messages.push_tree(messages, &());
444
445            let mut ranges = Vec::<Range<usize>>::new();
446            if new_messages.last().unwrap().is_pending() {
447                new_messages.push_tree(old_cursor.suffix(&()), &());
448            } else {
449                new_messages.push_tree(
450                    old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left, &()),
451                    &(),
452                );
453
454                while let Some(message) = old_cursor.item() {
455                    let message_ix = old_cursor.start().1 .0;
456                    if nonces.contains(&message.nonce) {
457                        if ranges.last().map_or(false, |r| r.end == message_ix) {
458                            ranges.last_mut().unwrap().end += 1;
459                        } else {
460                            ranges.push(message_ix..message_ix + 1);
461                        }
462                    } else {
463                        new_messages.push(message.clone(), &());
464                    }
465                    old_cursor.next(&());
466                }
467            }
468
469            drop(old_cursor);
470            self.messages = new_messages;
471
472            for range in ranges.into_iter().rev() {
473                cx.emit(ChannelEvent::MessagesUpdated {
474                    old_range: range,
475                    new_count: 0,
476                });
477            }
478            cx.emit(ChannelEvent::MessagesUpdated {
479                old_range: start_ix..end_ix,
480                new_count,
481            });
482            cx.notify();
483        }
484    }
485}
486
487async fn messages_from_proto(
488    proto_messages: Vec<proto::ChannelMessage>,
489    user_store: &ModelHandle<UserStore>,
490    cx: &mut AsyncAppContext,
491) -> Result<SumTree<ChannelMessage>> {
492    let unique_user_ids = proto_messages
493        .iter()
494        .map(|m| m.sender_id)
495        .collect::<HashSet<_>>()
496        .into_iter()
497        .collect();
498    user_store
499        .update(cx, |user_store, cx| {
500            user_store.load_users(unique_user_ids, cx)
501        })
502        .await?;
503
504    let mut messages = Vec::with_capacity(proto_messages.len());
505    for message in proto_messages {
506        messages.push(ChannelMessage::from_proto(message, user_store, cx).await?);
507    }
508    let mut result = SumTree::new();
509    result.extend(messages, &());
510    Ok(result)
511}
512
513impl From<proto::Channel> for ChannelDetails {
514    fn from(message: proto::Channel) -> Self {
515        Self {
516            id: message.id,
517            name: message.name,
518        }
519    }
520}
521
522impl ChannelMessage {
523    pub async fn from_proto(
524        message: proto::ChannelMessage,
525        user_store: &ModelHandle<UserStore>,
526        cx: &mut AsyncAppContext,
527    ) -> Result<Self> {
528        let sender = user_store
529            .update(cx, |user_store, cx| {
530                user_store.fetch_user(message.sender_id, cx)
531            })
532            .await?;
533        Ok(ChannelMessage {
534            id: ChannelMessageId::Saved(message.id),
535            body: message.body,
536            timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?,
537            sender,
538            nonce: message
539                .nonce
540                .ok_or_else(|| anyhow!("nonce is required"))?
541                .into(),
542        })
543    }
544
545    pub fn is_pending(&self) -> bool {
546        matches!(self.id, ChannelMessageId::Pending(_))
547    }
548}
549
550impl sum_tree::Item for ChannelMessage {
551    type Summary = ChannelMessageSummary;
552
553    fn summary(&self) -> Self::Summary {
554        ChannelMessageSummary {
555            max_id: self.id,
556            count: 1,
557        }
558    }
559}
560
561impl Default for ChannelMessageId {
562    fn default() -> Self {
563        Self::Saved(0)
564    }
565}
566
567impl sum_tree::Summary for ChannelMessageSummary {
568    type Context = ();
569
570    fn add_summary(&mut self, summary: &Self, _: &()) {
571        self.max_id = summary.max_id;
572        self.count += summary.count;
573    }
574}
575
576impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId {
577    fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
578        debug_assert!(summary.max_id > *self);
579        *self = summary.max_id;
580    }
581}
582
583impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count {
584    fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
585        self.0 += summary.count;
586    }
587}
588
589#[cfg(test)]
590mod tests {
591    use super::*;
592    use crate::test::{FakeHttpClient, FakeServer};
593    use gpui::TestAppContext;
594    use surf::http::Response;
595
596    #[gpui::test]
597    async fn test_channel_messages(mut cx: TestAppContext) {
598        let user_id = 5;
599        let http_client = FakeHttpClient::new(|_| async move { Ok(Response::new(404)) });
600        let mut client = Client::new(http_client.clone());
601        let server = FakeServer::for_client(user_id, &mut client, &cx).await;
602        let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
603
604        let channel_list = cx.add_model(|cx| ChannelList::new(user_store, client.clone(), cx));
605        channel_list.read_with(&cx, |list, _| assert_eq!(list.available_channels(), None));
606
607        // Get the available channels.
608        let get_channels = server.receive::<proto::GetChannels>().await.unwrap();
609        server
610            .respond(
611                get_channels.receipt(),
612                proto::GetChannelsResponse {
613                    channels: vec![proto::Channel {
614                        id: 5,
615                        name: "the-channel".to_string(),
616                    }],
617                },
618            )
619            .await;
620        channel_list.next_notification(&cx).await;
621        channel_list.read_with(&cx, |list, _| {
622            assert_eq!(
623                list.available_channels().unwrap(),
624                &[ChannelDetails {
625                    id: 5,
626                    name: "the-channel".into(),
627                }]
628            )
629        });
630
631        let get_users = server.receive::<proto::GetUsers>().await.unwrap();
632        assert_eq!(get_users.payload.user_ids, vec![5]);
633        server
634            .respond(
635                get_users.receipt(),
636                proto::GetUsersResponse {
637                    users: vec![proto::User {
638                        id: 5,
639                        github_login: "nathansobo".into(),
640                        avatar_url: "http://avatar.com/nathansobo".into(),
641                    }],
642                },
643            )
644            .await;
645
646        // Join a channel and populate its existing messages.
647        let channel = channel_list
648            .update(&mut cx, |list, cx| {
649                let channel_id = list.available_channels().unwrap()[0].id;
650                list.get_channel(channel_id, cx)
651            })
652            .unwrap();
653        channel.read_with(&cx, |channel, _| assert!(channel.messages().is_empty()));
654        let join_channel = server.receive::<proto::JoinChannel>().await.unwrap();
655        server
656            .respond(
657                join_channel.receipt(),
658                proto::JoinChannelResponse {
659                    messages: vec![
660                        proto::ChannelMessage {
661                            id: 10,
662                            body: "a".into(),
663                            timestamp: 1000,
664                            sender_id: 5,
665                            nonce: Some(1.into()),
666                        },
667                        proto::ChannelMessage {
668                            id: 11,
669                            body: "b".into(),
670                            timestamp: 1001,
671                            sender_id: 6,
672                            nonce: Some(2.into()),
673                        },
674                    ],
675                    done: false,
676                },
677            )
678            .await;
679
680        // Client requests all users for the received messages
681        let mut get_users = server.receive::<proto::GetUsers>().await.unwrap();
682        get_users.payload.user_ids.sort();
683        assert_eq!(get_users.payload.user_ids, vec![6]);
684        server
685            .respond(
686                get_users.receipt(),
687                proto::GetUsersResponse {
688                    users: vec![proto::User {
689                        id: 6,
690                        github_login: "maxbrunsfeld".into(),
691                        avatar_url: "http://avatar.com/maxbrunsfeld".into(),
692                    }],
693                },
694            )
695            .await;
696
697        assert_eq!(
698            channel.next_event(&cx).await,
699            ChannelEvent::MessagesUpdated {
700                old_range: 0..0,
701                new_count: 2,
702            }
703        );
704        channel.read_with(&cx, |channel, _| {
705            assert_eq!(
706                channel
707                    .messages_in_range(0..2)
708                    .map(|message| (message.sender.github_login.clone(), message.body.clone()))
709                    .collect::<Vec<_>>(),
710                &[
711                    ("nathansobo".into(), "a".into()),
712                    ("maxbrunsfeld".into(), "b".into())
713                ]
714            );
715        });
716
717        // Receive a new message.
718        server.send(proto::ChannelMessageSent {
719            channel_id: channel.read_with(&cx, |channel, _| channel.details.id),
720            message: Some(proto::ChannelMessage {
721                id: 12,
722                body: "c".into(),
723                timestamp: 1002,
724                sender_id: 7,
725                nonce: Some(3.into()),
726            }),
727        });
728
729        // Client requests user for message since they haven't seen them yet
730        let get_users = server.receive::<proto::GetUsers>().await.unwrap();
731        assert_eq!(get_users.payload.user_ids, vec![7]);
732        server
733            .respond(
734                get_users.receipt(),
735                proto::GetUsersResponse {
736                    users: vec![proto::User {
737                        id: 7,
738                        github_login: "as-cii".into(),
739                        avatar_url: "http://avatar.com/as-cii".into(),
740                    }],
741                },
742            )
743            .await;
744
745        assert_eq!(
746            channel.next_event(&cx).await,
747            ChannelEvent::MessagesUpdated {
748                old_range: 2..2,
749                new_count: 1,
750            }
751        );
752        channel.read_with(&cx, |channel, _| {
753            assert_eq!(
754                channel
755                    .messages_in_range(2..3)
756                    .map(|message| (message.sender.github_login.clone(), message.body.clone()))
757                    .collect::<Vec<_>>(),
758                &[("as-cii".into(), "c".into())]
759            )
760        });
761
762        // Scroll up to view older messages.
763        channel.update(&mut cx, |channel, cx| {
764            assert!(channel.load_more_messages(cx));
765        });
766        let get_messages = server.receive::<proto::GetChannelMessages>().await.unwrap();
767        assert_eq!(get_messages.payload.channel_id, 5);
768        assert_eq!(get_messages.payload.before_message_id, 10);
769        server
770            .respond(
771                get_messages.receipt(),
772                proto::GetChannelMessagesResponse {
773                    done: true,
774                    messages: vec![
775                        proto::ChannelMessage {
776                            id: 8,
777                            body: "y".into(),
778                            timestamp: 998,
779                            sender_id: 5,
780                            nonce: Some(4.into()),
781                        },
782                        proto::ChannelMessage {
783                            id: 9,
784                            body: "z".into(),
785                            timestamp: 999,
786                            sender_id: 6,
787                            nonce: Some(5.into()),
788                        },
789                    ],
790                },
791            )
792            .await;
793
794        assert_eq!(
795            channel.next_event(&cx).await,
796            ChannelEvent::MessagesUpdated {
797                old_range: 0..0,
798                new_count: 2,
799            }
800        );
801        channel.read_with(&cx, |channel, _| {
802            assert_eq!(
803                channel
804                    .messages_in_range(0..2)
805                    .map(|message| (message.sender.github_login.clone(), message.body.clone()))
806                    .collect::<Vec<_>>(),
807                &[
808                    ("nathansobo".into(), "y".into()),
809                    ("maxbrunsfeld".into(), "z".into())
810                ]
811            );
812        });
813    }
814}