Implement channel changes for messages

Mikayla created

Change summary

crates/channel/src/channel_store.rs               |  28 ++++
crates/channel/src/channel_store/channel_index.rs |  13 ++
crates/collab/src/db.rs                           |   3 
crates/collab/src/db/queries/channels.rs          |   5 
crates/collab/src/db/queries/messages.rs          |  40 +++---
crates/collab/src/db/tests/message_tests.rs       |   2 
crates/collab/src/rpc.rs                          |  38 +++++
crates/collab/src/tests/channel_buffer_tests.rs   |   1 
crates/collab/src/tests/channel_message_tests.rs  | 105 ++++++++++++++++
crates/collab_ui/src/collab_panel.rs              |   2 
crates/gpui/src/app.rs                            |   2 
crates/rpc/proto/zed.proto                        |   1 
12 files changed, 212 insertions(+), 28 deletions(-)

Detailed changes

crates/channel/src/channel_store.rs 🔗

@@ -44,6 +44,7 @@ pub struct Channel {
     pub id: ChannelId,
     pub name: String,
     pub has_note_changed: bool,
+    pub has_new_messages: bool,
 }
 
 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
@@ -223,6 +224,13 @@ impl ChannelStore {
             .map(|channel| channel.has_note_changed)
     }
 
+    pub fn has_new_messages(&self, channel_id: ChannelId) -> Option<bool> {
+        self.channel_index
+            .by_id()
+            .get(&channel_id)
+            .map(|channel| channel.has_new_messages)
+    }
+
     pub fn open_channel_chat(
         &mut self,
         channel_id: ChannelId,
@@ -230,12 +238,20 @@ impl ChannelStore {
     ) -> Task<Result<ModelHandle<ChannelChat>>> {
         let client = self.client.clone();
         let user_store = self.user_store.clone();
-        self.open_channel_resource(
+        let open_channel_chat = self.open_channel_resource(
             channel_id,
             |this| &mut this.opened_chats,
             |channel, cx| ChannelChat::new(channel, user_store, client, cx),
             cx,
-        )
+        );
+        cx.spawn(|this, mut cx| async move {
+            let chat = open_channel_chat.await?;
+            this.update(&mut cx, |this, cx| {
+                this.channel_index.clear_message_changed(channel_id);
+                cx.notify();
+            });
+            Ok(chat)
+        })
     }
 
     /// Asynchronously open a given resource associated with a channel.
@@ -796,6 +812,7 @@ impl ChannelStore {
                         id: channel.id,
                         name: channel.name,
                         has_note_changed: false,
+                        has_new_messages: false,
                     }),
                 ),
             }
@@ -805,7 +822,8 @@ impl ChannelStore {
             || !payload.delete_channels.is_empty()
             || !payload.insert_edge.is_empty()
             || !payload.delete_edge.is_empty()
-            || !payload.notes_changed.is_empty();
+            || !payload.notes_changed.is_empty()
+            || !payload.new_messages.is_empty();
 
         if channels_changed {
             if !payload.delete_channels.is_empty() {
@@ -836,6 +854,10 @@ impl ChannelStore {
                 index.note_changed(id_changed);
             }
 
+            for id_changed in payload.new_messages {
+                index.new_messages(id_changed);
+            }
+
             for edge in payload.insert_edge {
                 index.insert_edge(edge.channel_id, edge.parent_id);
             }

crates/channel/src/channel_store/channel_index.rs 🔗

@@ -44,6 +44,12 @@ impl ChannelIndex {
             Arc::make_mut(channel).has_note_changed = false;
         }
     }
+
+    pub fn clear_message_changed(&mut self, channel_id: ChannelId) {
+        if let Some(channel) = self.channels_by_id.get_mut(&channel_id) {
+            Arc::make_mut(channel).has_new_messages = false;
+        }
+    }
 }
 
 impl Deref for ChannelIndex {
@@ -88,6 +94,12 @@ impl<'a> ChannelPathsInsertGuard<'a> {
         }
     }
 
+    pub fn new_messages(&mut self, channel_id: ChannelId) {
+        if let Some(channel) = self.channels_by_id.get_mut(&channel_id) {
+            Arc::make_mut(channel).has_new_messages = true;
+        }
+    }
+
     pub fn insert(&mut self, channel_proto: proto::Channel) {
         if let Some(existing_channel) = self.channels_by_id.get_mut(&channel_proto.id) {
             Arc::make_mut(existing_channel).name = channel_proto.name;
@@ -98,6 +110,7 @@ impl<'a> ChannelPathsInsertGuard<'a> {
                     id: channel_proto.id,
                     name: channel_proto.name,
                     has_note_changed: false,
+                    has_new_messages: false,
                 }),
             );
             self.insert_root(channel_proto.id);

crates/collab/src/db.rs 🔗

@@ -436,8 +436,9 @@ pub struct Channel {
 pub struct ChannelsForUser {
     pub channels: ChannelGraph,
     pub channel_participants: HashMap<ChannelId, Vec<UserId>>,
-    pub channels_with_changed_notes: HashSet<ChannelId>,
     pub channels_with_admin_privileges: HashSet<ChannelId>,
+    pub channels_with_changed_notes: HashSet<ChannelId>,
+    pub channels_with_new_messages: HashSet<ChannelId>,
 }
 
 #[derive(Debug)]

crates/collab/src/db/queries/channels.rs 🔗

@@ -464,10 +464,14 @@ impl Database {
         }
 
         let mut channels_with_changed_notes = HashSet::default();
+        let mut channels_with_new_messages = HashSet::default();
         for channel in graph.channels.iter() {
             if self.has_note_changed(user_id, channel.id, tx).await? {
                 channels_with_changed_notes.insert(channel.id);
             }
+            if self.has_new_message(channel.id, user_id, tx).await? {
+                channels_with_new_messages.insert(channel.id);
+            }
         }
 
         Ok(ChannelsForUser {
@@ -475,6 +479,7 @@ impl Database {
             channel_participants,
             channels_with_admin_privileges,
             channels_with_changed_notes,
+            channels_with_new_messages,
         })
     }
 

crates/collab/src/db/queries/messages.rs 🔗

@@ -97,7 +97,7 @@ impl Database {
             let mut messages = Vec::new();
             while let Some(row) = rows.next().await {
                 let row = row?;
-                dbg!(&max_id);
+
                 max_assign(&mut max_id, row.id);
 
                 let nonce = row.nonce.as_u64_pair();
@@ -113,23 +113,18 @@ impl Database {
                 });
             }
             drop(rows);
-            dbg!(&max_id);
 
             if let Some(max_id) = max_id {
-                let has_older_message = dbg!(
-                    observed_channel_messages::Entity::find()
-                        .filter(
-                            observed_channel_messages::Column::UserId
-                                .eq(user_id)
-                                .and(observed_channel_messages::Column::ChannelId.eq(channel_id))
-                                .and(
-                                    observed_channel_messages::Column::ChannelMessageId.lt(max_id)
-                                ),
-                        )
-                        .one(&*tx)
-                        .await
-                )?
-                .is_some();
+                let has_older_message = observed_channel_messages::Entity::find()
+                    .filter(
+                        observed_channel_messages::Column::UserId
+                            .eq(user_id)
+                            .and(observed_channel_messages::Column::ChannelId.eq(channel_id))
+                            .and(observed_channel_messages::Column::ChannelMessageId.lt(max_id)),
+                    )
+                    .one(&*tx)
+                    .await?
+                    .is_some();
 
                 if has_older_message {
                     observed_channel_messages::Entity::update(
@@ -174,7 +169,7 @@ impl Database {
         body: &str,
         timestamp: OffsetDateTime,
         nonce: u128,
-    ) -> Result<(MessageId, Vec<ConnectionId>)> {
+    ) -> Result<(MessageId, Vec<ConnectionId>, Vec<UserId>)> {
         self.transaction(|tx| async move {
             let mut rows = channel_chat_participant::Entity::find()
                 .filter(channel_chat_participant::Column::ChannelId.eq(channel_id))
@@ -241,7 +236,14 @@ impl Database {
             .exec(&*tx)
             .await?;
 
-            Ok((message.last_insert_id, participant_connection_ids))
+            let mut channel_members = self.get_channel_members_internal(channel_id, &*tx).await?;
+            channel_members.retain(|member| !participant_user_ids.contains(member));
+
+            Ok((
+                message.last_insert_id,
+                participant_connection_ids,
+                channel_members,
+            ))
         })
         .await
     }
@@ -290,7 +292,7 @@ impl Database {
             .await?
             .map(|model| model.channel_message_id);
 
-        Ok(dbg!(last_message_read) != dbg!(latest_message_id))
+        Ok(last_message_read != latest_message_id)
     }
 
     pub async fn remove_channel_message(

crates/collab/src/db/tests/message_tests.rs 🔗

@@ -120,7 +120,7 @@ async fn test_channel_message_new_notification(db: &Arc<Database>) {
         .await
         .unwrap();
 
-    let (second_message, _) = db
+    let (second_message, _, _) = db
         .create_channel_message(channel, user_a, "2", OffsetDateTime::now_utc(), 2)
         .await
         .unwrap();

crates/collab/src/rpc.rs 🔗

@@ -2568,6 +2568,16 @@ async fn respond_to_channel_invite(
                         name: channel.name,
                     }),
             );
+        update.notes_changed = result
+            .channels_with_changed_notes
+            .iter()
+            .map(|id| id.to_proto())
+            .collect();
+        update.new_messages = result
+            .channels_with_new_messages
+            .iter()
+            .map(|id| id.to_proto())
+            .collect();
         update.insert_edge = result.channels.edges;
         update
             .channel_participants
@@ -2818,7 +2828,7 @@ async fn send_channel_message(
         .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 
     let channel_id = ChannelId::from_proto(request.channel_id);
-    let (message_id, connection_ids) = session
+    let (message_id, connection_ids, non_participants) = session
         .db()
         .await
         .create_channel_message(
@@ -2848,6 +2858,26 @@ async fn send_channel_message(
     response.send(proto::SendChannelMessageResponse {
         message: Some(message),
     })?;
+
+    dbg!(&non_participants);
+    let pool = &*session.connection_pool().await;
+
+    broadcast(
+        None,
+        non_participants
+            .iter()
+            .flat_map(|user_id| pool.user_connection_ids(*user_id)),
+        |peer_id| {
+            session.peer.send(
+                peer_id.into(),
+                proto::UpdateChannels {
+                    new_messages: vec![channel_id.to_proto()],
+                    ..Default::default()
+                },
+            )
+        },
+    );
+
     Ok(())
 }
 
@@ -3011,6 +3041,12 @@ fn build_initial_channels_update(
         .map(|channel_id| channel_id.to_proto())
         .collect();
 
+    update.new_messages = channels
+        .channels_with_new_messages
+        .iter()
+        .map(|channel_id| channel_id.to_proto())
+        .collect();
+
     update.insert_edge = channels.channels.edges;
 
     for (channel_id, participants) in channels.channel_participants {

crates/collab/src/tests/channel_message_tests.rs 🔗

@@ -1,6 +1,6 @@
 use crate::{rpc::RECONNECT_TIMEOUT, tests::TestServer};
 use channel::{ChannelChat, ChannelMessageId};
-use gpui::{executor::Deterministic, ModelHandle, TestAppContext};
+use gpui::{executor::Deterministic, BorrowAppContext, ModelHandle, TestAppContext};
 use std::sync::Arc;
 
 #[gpui::test]
@@ -223,3 +223,106 @@ fn assert_messages(chat: &ModelHandle<ChannelChat>, messages: &[&str], cx: &mut
         messages
     );
 }
+
+#[gpui::test]
+async fn test_channel_message_changes(
+    deterministic: Arc<Deterministic>,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+) {
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(&deterministic).await;
+    let client_a = server.create_client(cx_a, "user_a").await;
+    let client_b = server.create_client(cx_b, "user_b").await;
+
+    let channel_id = server
+        .make_channel(
+            "the-channel",
+            None,
+            (&client_a, cx_a),
+            &mut [(&client_b, cx_b)],
+        )
+        .await;
+
+    // Client A sends a message, client B should see that there is a new message.
+    let channel_chat_a = client_a
+        .channel_store()
+        .update(cx_a, |store, cx| store.open_channel_chat(channel_id, cx))
+        .await
+        .unwrap();
+
+    channel_chat_a
+        .update(cx_a, |c, cx| c.send_message("one".into(), cx).unwrap())
+        .await
+        .unwrap();
+
+    deterministic.run_until_parked();
+
+    let b_has_messages = cx_b.read_with(|cx| {
+        client_b
+            .channel_store()
+            .read(cx)
+            .has_new_messages(channel_id)
+            .unwrap()
+    });
+
+    assert!(b_has_messages);
+
+    // Opening the chat should clear the changed flag.
+    let channel_chat_b = client_b
+        .channel_store()
+        .update(cx_b, |store, cx| store.open_channel_chat(channel_id, cx))
+        .await
+        .unwrap();
+
+    let b_has_messages = cx_b.read_with(|cx| {
+        client_b
+            .channel_store()
+            .read(cx)
+            .has_new_messages(channel_id)
+            .unwrap()
+    });
+
+    assert!(!b_has_messages);
+
+    // Sending a message while the chat is open should not change the flag.
+    channel_chat_a
+        .update(cx_a, |c, cx| c.send_message("two".into(), cx).unwrap())
+        .await
+        .unwrap();
+
+    deterministic.run_until_parked();
+
+    let b_has_messages = cx_b.read_with(|cx| {
+        client_b
+            .channel_store()
+            .read(cx)
+            .has_new_messages(channel_id)
+            .unwrap()
+    });
+
+    assert!(!b_has_messages);
+
+    // Closing the chat should re-enable change tracking
+
+    cx_b.update(|_| {
+        drop(channel_chat_b);
+    });
+
+    deterministic.run_until_parked();
+
+    channel_chat_a
+        .update(cx_a, |c, cx| c.send_message("three".into(), cx).unwrap())
+        .await
+        .unwrap();
+
+    let b_has_messages = cx_b.read_with(|cx| {
+        client_b
+            .channel_store()
+            .read(cx)
+            .has_new_messages(channel_id)
+            .unwrap()
+    });
+
+    assert!(b_has_messages);
+}

crates/collab_ui/src/collab_panel.rs 🔗

@@ -1821,7 +1821,7 @@ impl CollabPanel {
                         channel.name.clone(),
                         theme
                             .channel_name
-                            .in_state(channel.has_note_changed)
+                            .in_state(channel.has_new_messages)
                             .text
                             .clone(),
                     )

crates/gpui/src/app.rs 🔗

@@ -1252,7 +1252,7 @@ impl AppContext {
                 result
             })
         } else {
-            panic!("circular model update");
+            panic!("circular model update for {}", std::any::type_name::<T>());
         }
     }
 

crates/rpc/proto/zed.proto 🔗

@@ -956,6 +956,7 @@ message UpdateChannels {
     repeated ChannelParticipants channel_participants = 7;
     repeated ChannelPermission channel_permissions = 8;
     repeated uint64 notes_changed = 9;
+    repeated uint64 new_messages = 10;
 }
 
 message ChannelEdge {