Save messages received from the server

Antonio Scandurra created

Change summary

server/src/tests.rs | 74 ++++++++++++++++++++++++++++++++++++----------
zed/src/channel.rs  | 14 +++++++-
2 files changed, 69 insertions(+), 19 deletions(-)

Detailed changes

server/src/tests.rs 🔗

@@ -4,7 +4,7 @@ use crate::{
     github, rpc, AppState, Config,
 };
 use async_std::task;
-use gpui::TestAppContext;
+use gpui::{ModelHandle, TestAppContext};
 use rand::prelude::*;
 use serde_json::json;
 use sqlx::{
@@ -14,13 +14,12 @@ use sqlx::{
 };
 use std::{path::Path, sync::Arc};
 use zed::{
-    channel::{ChannelDetails, ChannelList},
+    channel::{Channel, ChannelDetails, ChannelList},
     editor::Editor,
     fs::{FakeFs, Fs as _},
     language::LanguageRegistry,
     rpc::Client,
-    settings,
-    test::Channel,
+    settings, test,
     worktree::Worktree,
 };
 use zrpc::Peer;
@@ -479,11 +478,11 @@ async fn test_peer_disconnection(mut cx_a: TestAppContext, cx_b: TestAppContext)
 }
 
 #[gpui::test]
-async fn test_basic_chat(mut cx_a: TestAppContext, cx_b: TestAppContext) {
+async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
     // Connect to a server as 2 clients.
     let mut server = TestServer::start().await;
     let (user_id_a, client_a) = server.create_client(&mut cx_a, "user_a").await;
-    let (user_id_b, client_b) = server.create_client(&mut cx_a, "user_b").await;
+    let (user_id_b, client_b) = server.create_client(&mut cx_b, "user_b").await;
 
     // Create an org that includes these 2 users.
     let db = &server.app_state.db;
@@ -520,22 +519,37 @@ async fn test_basic_chat(mut cx_a: TestAppContext, cx_b: TestAppContext) {
             }]
         )
     });
-
     let channel_a = channels_a.update(&mut cx_a, |this, cx| {
         this.get_channel(channel_id.to_proto(), cx).unwrap()
     });
     channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
     channel_a.next_notification(&cx_a).await;
-    channel_a.read_with(&cx_a, |channel, _| {
+    assert_eq!(
+        channel_messages(&channel_a, &cx_a),
+        &[(user_id_b.to_proto(), "hello A, it's B.".to_string())]
+    );
+
+    let channels_b = ChannelList::new(client_b, &mut cx_b.to_async())
+        .await
+        .unwrap();
+    channels_b.read_with(&cx_b, |list, _| {
         assert_eq!(
-            channel
-                .messages()
-                .iter()
-                .map(|m| (m.sender_id, m.body.as_ref()))
-                .collect::<Vec<_>>(),
-            &[(user_id_b.to_proto(), "hello A, it's B.")]
-        );
+            list.available_channels(),
+            &[ChannelDetails {
+                id: channel_id.to_proto(),
+                name: "test-channel".to_string()
+            }]
+        )
     });
+    let channel_b = channels_b.update(&mut cx_b, |this, cx| {
+        this.get_channel(channel_id.to_proto(), cx).unwrap()
+    });
+    channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
+    channel_b.next_notification(&cx_b).await;
+    assert_eq!(
+        channel_messages(&channel_b, &cx_b),
+        &[(user_id_b.to_proto(), "hello A, it's B.".to_string())]
+    );
 
     channel_a.update(&mut cx_a, |channel, cx| {
         channel.send_message("oh, hi B.".to_string(), cx).unwrap();
@@ -549,8 +563,34 @@ async fn test_basic_chat(mut cx_a: TestAppContext, cx_b: TestAppContext) {
             &["oh, hi B.", "sup"]
         )
     });
-
     channel_a.next_notification(&cx_a).await;
+    channel_a.read_with(&cx_a, |channel, _| {
+        assert_eq!(channel.pending_messages().len(), 1);
+    });
+    channel_a.next_notification(&cx_a).await;
+    channel_a.read_with(&cx_a, |channel, _| {
+        assert_eq!(channel.pending_messages().len(), 0);
+    });
+
+    channel_b.next_notification(&cx_b).await;
+    assert_eq!(
+        channel_messages(&channel_b, &cx_b),
+        &[
+            (user_id_b.to_proto(), "hello A, it's B.".to_string()),
+            (user_id_a.to_proto(), "oh, hi B.".to_string()),
+            (user_id_a.to_proto(), "sup".to_string()),
+        ]
+    );
+
+    fn channel_messages(channel: &ModelHandle<Channel>, cx: &TestAppContext) -> Vec<(u64, String)> {
+        channel.read_with(cx, |channel, _| {
+            channel
+                .messages()
+                .iter()
+                .map(|m| (m.sender_id, m.body.clone()))
+                .collect()
+        })
+    }
 }
 
 struct TestServer {
@@ -582,7 +622,7 @@ impl TestServer {
     ) -> (UserId, Arc<Client>) {
         let user_id = self.app_state.db.create_user(name, false).await.unwrap();
         let client = Client::new();
-        let (client_conn, server_conn) = Channel::bidirectional();
+        let (client_conn, server_conn) = test::Channel::bidirectional();
         cx.background()
             .spawn(
                 self.server

zed/src/channel.rs 🔗

@@ -146,7 +146,7 @@ impl Channel {
 
     pub fn send_message(&mut self, body: String, cx: &mut ModelContext<Self>) -> Result<()> {
         let channel_id = self.details.id;
-        let current_user_id = self.rpc.user_id().ok_or_else(|| anyhow!("not logged in"))?;
+        let current_user_id = self.current_user_id()?;
         let local_id = self.next_local_message_id;
         self.next_local_message_id += 1;
         self.pending_messages.push(PendingChannelMessage {
@@ -187,12 +187,22 @@ impl Channel {
         &self.pending_messages
     }
 
+    fn current_user_id(&self) -> Result<u64> {
+        self.rpc.user_id().ok_or_else(|| anyhow!("not logged in"))
+    }
+
     fn handle_message_sent(
         &mut self,
         message: TypedEnvelope<ChannelMessageSent>,
-        rpc: Arc<rpc::Client>,
+        _: Arc<rpc::Client>,
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
+        let message = message
+            .payload
+            .message
+            .ok_or_else(|| anyhow!("empty message"))?;
+        self.messages.push_back(message.into());
+        cx.notify();
         Ok(())
     }
 }