Fix notes unread status (#7643)

Conrad Irwin created

1. The client-side comparison was wrong
2. The server never told the client about the version it remembered
3. The server generated broken timestamps in some cases

Release Notes:

- Fixed the notes/chat appearing as unread too often

**or**

- N/A

Change summary

crates/channel/src/channel_store.rs             |   7 
crates/collab/src/db.rs                         |   3 
crates/collab/src/db/queries/buffers.rs         |  46 ++++--
crates/collab/src/db/queries/channels.rs        |  28 +++
crates/collab/src/db/queries/messages.rs        |  24 +++
crates/collab/src/db/tests/buffer_tests.rs      |  17 -
crates/collab/src/rpc.rs                        |  11 
crates/collab/src/tests/channel_buffer_tests.rs | 126 ++++++------------
crates/collab/src/tests/test_server.rs          |  11 +
9 files changed, 155 insertions(+), 118 deletions(-)

Detailed changes

crates/channel/src/channel_store.rs 🔗

@@ -1131,9 +1131,10 @@ impl ChannelState {
         if let Some(latest_version) = &self.latest_notes_versions {
             if let Some(observed_version) = &self.observed_notes_versions {
                 latest_version.epoch > observed_version.epoch
-                    || latest_version
-                        .version
-                        .changed_since(&observed_version.version)
+                    || (latest_version.epoch == observed_version.epoch
+                        && latest_version
+                            .version
+                            .changed_since(&observed_version.version))
             } else {
                 true
             }

crates/collab/src/db.rs 🔗

@@ -587,6 +587,9 @@ pub struct ChannelsForUser {
     pub channels: Vec<Channel>,
     pub channel_memberships: Vec<channel_member::Model>,
     pub channel_participants: HashMap<ChannelId, Vec<UserId>>,
+
+    pub observed_buffer_versions: Vec<proto::ChannelBufferVersion>,
+    pub observed_channel_messages: Vec<proto::ChannelMessageId>,
     pub latest_buffer_versions: Vec<proto::ChannelBufferVersion>,
     pub latest_channel_messages: Vec<proto::ChannelMessageId>,
 }

crates/collab/src/db/queries/buffers.rs 🔗

@@ -561,7 +561,6 @@ impl Database {
         tx: &DatabaseTransaction,
     ) -> Result<()> {
         use observed_buffer_edits::Column;
-
         observed_buffer_edits::Entity::insert(observed_buffer_edits::ActiveModel {
             user_id: ActiveValue::Set(user_id),
             buffer_id: ActiveValue::Set(buffer_id),
@@ -671,7 +670,7 @@ impl Database {
                 buffer_id: row.buffer_id,
                 epoch: row.epoch,
                 lamport_timestamp: row.lamport_timestamp,
-                replica_id: row.lamport_timestamp,
+                replica_id: row.replica_id,
                 value: Default::default(),
             });
             operations.push(proto::Operation {
@@ -750,20 +749,9 @@ impl Database {
 
     pub async fn latest_channel_buffer_changes(
         &self,
-        channel_ids: &[ChannelId],
+        channel_ids_by_buffer_id: &HashMap<BufferId, ChannelId>,
         tx: &DatabaseTransaction,
     ) -> Result<Vec<proto::ChannelBufferVersion>> {
-        let mut channel_ids_by_buffer_id = HashMap::default();
-        let mut rows = buffer::Entity::find()
-            .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
-            .stream(&*tx)
-            .await?;
-        while let Some(row) = rows.next().await {
-            let row = row?;
-            channel_ids_by_buffer_id.insert(row.id, row.channel_id);
-        }
-        drop(rows);
-
         let latest_operations = self
             .get_latest_operations_for_buffers(channel_ids_by_buffer_id.keys().copied(), &*tx)
             .await?;
@@ -783,6 +771,36 @@ impl Database {
             .collect())
     }
 
+    pub async fn observed_channel_buffer_changes(
+        &self,
+        channel_ids_by_buffer_id: &HashMap<BufferId, ChannelId>,
+        user_id: UserId,
+        tx: &DatabaseTransaction,
+    ) -> Result<Vec<proto::ChannelBufferVersion>> {
+        let observed_operations = observed_buffer_edits::Entity::find()
+            .filter(observed_buffer_edits::Column::UserId.eq(user_id))
+            .filter(
+                observed_buffer_edits::Column::BufferId
+                    .is_in(channel_ids_by_buffer_id.keys().copied()),
+            )
+            .all(&*tx)
+            .await?;
+
+        Ok(observed_operations
+            .iter()
+            .flat_map(|op| {
+                Some(proto::ChannelBufferVersion {
+                    channel_id: channel_ids_by_buffer_id.get(&op.buffer_id)?.to_proto(),
+                    epoch: op.epoch as u64,
+                    version: vec![proto::VectorClockEntry {
+                        replica_id: op.replica_id as u32,
+                        timestamp: op.lamport_timestamp as u32,
+                    }],
+                })
+            })
+            .collect())
+    }
+
     /// Returns the latest operations for the buffers with the specified IDs.
     pub async fn get_latest_operations_for_buffers(
         &self,

crates/collab/src/db/queries/channels.rs 🔗

@@ -673,18 +673,40 @@ impl Database {
         }
 
         let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
+
+        let mut channel_ids_by_buffer_id = HashMap::default();
+        let mut rows = buffer::Entity::find()
+            .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
+            .stream(&*tx)
+            .await?;
+        while let Some(row) = rows.next().await {
+            let row = row?;
+            channel_ids_by_buffer_id.insert(row.id, row.channel_id);
+        }
+        drop(rows);
+
         let latest_buffer_versions = self
-            .latest_channel_buffer_changes(&channel_ids, &*tx)
+            .latest_channel_buffer_changes(&channel_ids_by_buffer_id, &*tx)
             .await?;
 
-        let latest_messages = self.latest_channel_messages(&channel_ids, &*tx).await?;
+        let latest_channel_messages = self.latest_channel_messages(&channel_ids, &*tx).await?;
+
+        let observed_buffer_versions = self
+            .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, &*tx)
+            .await?;
+
+        let observed_channel_messages = self
+            .observed_channel_messages(&channel_ids, user_id, &*tx)
+            .await?;
 
         Ok(ChannelsForUser {
             channel_memberships,
             channels,
             channel_participants,
             latest_buffer_versions,
-            latest_channel_messages: latest_messages,
+            latest_channel_messages,
+            observed_buffer_versions,
+            observed_channel_messages,
         })
     }
 

crates/collab/src/db/queries/messages.rs 🔗

@@ -388,6 +388,30 @@ impl Database {
         Ok(())
     }
 
+    pub async fn observed_channel_messages(
+        &self,
+        channel_ids: &[ChannelId],
+        user_id: UserId,
+        tx: &DatabaseTransaction,
+    ) -> Result<Vec<proto::ChannelMessageId>> {
+        let rows = observed_channel_messages::Entity::find()
+            .filter(observed_channel_messages::Column::UserId.eq(user_id))
+            .filter(
+                observed_channel_messages::Column::ChannelId
+                    .is_in(channel_ids.iter().map(|id| id.0)),
+            )
+            .all(&*tx)
+            .await?;
+
+        Ok(rows
+            .into_iter()
+            .map(|message| proto::ChannelMessageId {
+                channel_id: message.channel_id.to_proto(),
+                message_id: message.channel_message_id.to_proto(),
+            })
+            .collect())
+    }
+
     pub async fn latest_channel_messages(
         &self,
         channel_ids: &[ChannelId],

crates/collab/src/db/tests/buffer_tests.rs 🔗

@@ -337,17 +337,12 @@ async fn test_channel_buffers_last_operations(db: &Database) {
     let buffer_changes = db
         .transaction(|tx| {
             let buffers = &buffers;
-            async move {
-                db.latest_channel_buffer_changes(
-                    &[
-                        buffers[0].channel_id,
-                        buffers[1].channel_id,
-                        buffers[2].channel_id,
-                    ],
-                    &*tx,
-                )
-                .await
-            }
+            let mut hash = HashMap::default();
+            hash.insert(buffers[0].id, buffers[0].channel_id);
+            hash.insert(buffers[1].id, buffers[1].channel_id);
+            hash.insert(buffers[2].id, buffers[2].channel_id);
+
+            async move { db.latest_channel_buffer_changes(&hash, &*tx).await }
         })
         .await
         .unwrap();

crates/collab/src/rpc.rs 🔗

@@ -620,7 +620,7 @@ impl Server {
                 let mut pool = this.connection_pool.lock();
                 pool.add_connection(connection_id, user_id, user.admin);
                 this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
-                this.peer.send(connection_id, build_update_user_channels(&channels_for_user.channel_memberships))?;
+                this.peer.send(connection_id, build_update_user_channels(&channels_for_user))?;
                 this.peer.send(connection_id, build_channels_update(
                     channels_for_user,
                     channel_invites
@@ -3440,17 +3440,18 @@ fn notify_membership_updated(
     }
 }
 
-fn build_update_user_channels(
-    memberships: &Vec<db::channel_member::Model>,
-) -> proto::UpdateUserChannels {
+fn build_update_user_channels(channels: &ChannelsForUser) -> proto::UpdateUserChannels {
     proto::UpdateUserChannels {
-        channel_memberships: memberships
+        channel_memberships: channels
+            .channel_memberships
             .iter()
             .map(|m| proto::ChannelMembership {
                 channel_id: m.channel_id.to_proto(),
                 role: m.role.into(),
             })
             .collect(),
+        observed_channel_buffer_version: channels.observed_buffer_versions.clone(),
+        observed_channel_message_id: channels.observed_channel_messages.clone(),
         ..Default::default()
     }
 }

crates/collab/src/tests/channel_buffer_tests.rs 🔗

@@ -1,6 +1,6 @@
 use crate::{
     rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT},
-    tests::TestServer,
+    tests::{test_server::open_channel_notes, TestServer},
 };
 use call::ActiveCall;
 use channel::ACKNOWLEDGE_DEBOUNCE_INTERVAL;
@@ -605,113 +605,75 @@ async fn test_channel_buffer_changes(
     cx_a: &mut TestAppContext,
     cx_b: &mut TestAppContext,
 ) {
-    let mut server = TestServer::start(deterministic.clone()).await;
-    let client_a = server.create_client(cx_a, "user_a").await;
-    let client_b = server.create_client(cx_b, "user_b").await;
-
-    let channel_id = server
-        .make_channel(
-            "the-channel",
-            None,
-            (&client_a, cx_a),
-            &mut [(&client_b, cx_b)],
-        )
-        .await;
+    let (server, client_a, client_b, channel_id) = TestServer::start2(cx_a, cx_b).await;
+    let (_, cx_a) = client_a.build_test_workspace(cx_a).await;
+    let (workspace_b, cx_b) = client_b.build_test_workspace(cx_b).await;
+    let channel_store_b = client_b.channel_store().clone();
 
-    let channel_buffer_a = client_a
-        .channel_store()
-        .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx))
-        .await
-        .unwrap();
-
-    // Client A makes an edit, and client B should see that the note has changed.
-    channel_buffer_a.update(cx_a, |buffer, cx| {
-        buffer.buffer().update(cx, |buffer, cx| {
-            buffer.edit([(0..0, "1")], None, cx);
-        })
-    });
-    deterministic.run_until_parked();
-
-    let has_buffer_changed = cx_b.update(|cx| {
-        client_b
-            .channel_store()
-            .read(cx)
-            .has_channel_buffer_changed(channel_id)
+    // Editing the channel notes should set them to dirty
+    open_channel_notes(channel_id, cx_a).await.unwrap();
+    cx_a.simulate_keystrokes("1");
+    channel_store_b.read_with(cx_b, |channel_store, _| {
+        assert!(channel_store.has_channel_buffer_changed(channel_id))
     });
-    assert!(has_buffer_changed);
 
     // Opening the buffer should clear the changed flag.
-    let project_b = client_b.build_empty_local_project(cx_b);
-    let (workspace_b, cx_b) = client_b.build_workspace(&project_b, cx_b);
-    let channel_view_b = cx_b
-        .update(|cx| ChannelView::open(channel_id, None, workspace_b.clone(), cx))
-        .await
-        .unwrap();
-    deterministic.run_until_parked();
-
-    let has_buffer_changed = cx_b.update(|cx| {
-        client_b
-            .channel_store()
-            .read(cx)
-            .has_channel_buffer_changed(channel_id)
+    open_channel_notes(channel_id, cx_b).await.unwrap();
+    channel_store_b.read_with(cx_b, |channel_store, _| {
+        assert!(!channel_store.has_channel_buffer_changed(channel_id))
     });
-    assert!(!has_buffer_changed);
 
     // Editing the channel while the buffer is open should not show that the buffer has changed.
-    channel_buffer_a.update(cx_a, |buffer, cx| {
-        buffer.buffer().update(cx, |buffer, cx| {
-            buffer.edit([(0..0, "2")], None, cx);
-        })
-    });
-    deterministic.run_until_parked();
-
-    let has_buffer_changed = cx_b.read(|cx| {
-        client_b
-            .channel_store()
-            .read(cx)
-            .has_channel_buffer_changed(channel_id)
+    cx_a.simulate_keystrokes("2");
+    channel_store_b.read_with(cx_b, |channel_store, _| {
+        assert!(!channel_store.has_channel_buffer_changed(channel_id))
     });
-    assert!(!has_buffer_changed);
-
-    deterministic.advance_clock(ACKNOWLEDGE_DEBOUNCE_INTERVAL);
 
     // Test that the server is tracking things correctly, and we retain our 'not changed'
     // state across a disconnect
+    deterministic.advance_clock(ACKNOWLEDGE_DEBOUNCE_INTERVAL);
     server
         .simulate_long_connection_interruption(client_b.peer_id().unwrap(), deterministic.clone());
-    let has_buffer_changed = cx_b.read(|cx| {
-        client_b
-            .channel_store()
-            .read(cx)
-            .has_channel_buffer_changed(channel_id)
+    channel_store_b.read_with(cx_b, |channel_store, _| {
+        assert!(!channel_store.has_channel_buffer_changed(channel_id))
     });
-    assert!(!has_buffer_changed);
 
     // Closing the buffer should re-enable change tracking
     cx_b.update(|cx| {
         workspace_b.update(cx, |workspace, cx| {
             workspace.close_all_items_and_panes(&Default::default(), cx)
         });
-
-        drop(channel_view_b)
     });
-
     deterministic.run_until_parked();
 
-    channel_buffer_a.update(cx_a, |buffer, cx| {
-        buffer.buffer().update(cx, |buffer, cx| {
-            buffer.edit([(0..0, "3")], None, cx);
-        })
+    cx_a.simulate_keystrokes("3");
+    channel_store_b.read_with(cx_b, |channel_store, _| {
+        assert!(channel_store.has_channel_buffer_changed(channel_id))
     });
-    deterministic.run_until_parked();
+}
 
-    let has_buffer_changed = cx_b.read(|cx| {
-        client_b
-            .channel_store()
-            .read(cx)
-            .has_channel_buffer_changed(channel_id)
+#[gpui::test]
+async fn test_channel_buffer_changes_persist(
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+    cx_b2: &mut TestAppContext,
+) {
+    let (mut server, client_a, client_b, channel_id) = TestServer::start2(cx_a, cx_b).await;
+    let (_, cx_a) = client_a.build_test_workspace(cx_a).await;
+    let (_, cx_b) = client_b.build_test_workspace(cx_b).await;
+
+    // a) edits the notes
+    open_channel_notes(channel_id, cx_a).await.unwrap();
+    cx_a.simulate_keystrokes("1");
+    // b) opens them to observe the current version
+    open_channel_notes(channel_id, cx_b).await.unwrap();
+
+    // On boot the client should get the correct state.
+    let client_b2 = server.create_client(cx_b2, "user_b").await;
+    let channel_store_b2 = client_b2.channel_store().clone();
+    channel_store_b2.read_with(cx_b2, |channel_store, _| {
+        assert!(!channel_store.has_channel_buffer_changed(channel_id))
     });
-    assert!(has_buffer_changed);
 }
 
 #[track_caller]

crates/collab/src/tests/test_server.rs 🔗

@@ -10,6 +10,7 @@ use channel::{ChannelBuffer, ChannelStore};
 use client::{
     self, proto::PeerId, Client, Connection, Credentials, EstablishConnectionError, UserStore,
 };
+use collab_ui::channel_view::ChannelView;
 use collections::{HashMap, HashSet};
 use fs::FakeFs;
 use futures::{channel::oneshot, StreamExt as _};
@@ -766,6 +767,16 @@ pub fn join_channel_call(cx: &mut TestAppContext) -> Task<anyhow::Result<()>> {
     room.unwrap().update(cx, |room, cx| room.join_call(cx))
 }
 
+pub fn open_channel_notes(
+    channel_id: u64,
+    cx: &mut VisualTestContext,
+) -> Task<anyhow::Result<View<ChannelView>>> {
+    let window = cx.update(|cx| cx.active_window().unwrap().downcast::<Workspace>().unwrap());
+    let view = window.root_view(cx).unwrap();
+
+    cx.update(|cx| ChannelView::open(channel_id, None, view.clone(), cx))
+}
+
 impl Drop for TestClient {
     fn drop(&mut self) {
         self.app_state.client.teardown();