From 68893c2ae6b69ece0fc4f0ee9e48e5a304d22645 Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Fri, 9 Feb 2024 23:12:26 -0700 Subject: [PATCH] Fix notes unread status (#7643) 1. The client-side comparison was wrong 2. The server never told the client about the version it remembered 3. The server generated broken timestamps in some cases Release Notes: - Fixed the notes/chat appearing as unread too often **or** - N/A --- crates/channel/src/channel_store.rs | 7 +- crates/collab/src/db.rs | 3 + crates/collab/src/db/queries/buffers.rs | 46 +++++-- crates/collab/src/db/queries/channels.rs | 28 +++- crates/collab/src/db/queries/messages.rs | 24 ++++ crates/collab/src/db/tests/buffer_tests.rs | 17 +-- crates/collab/src/rpc.rs | 11 +- .../collab/src/tests/channel_buffer_tests.rs | 126 ++++++------------ crates/collab/src/tests/test_server.rs | 11 ++ 9 files changed, 155 insertions(+), 118 deletions(-) diff --git a/crates/channel/src/channel_store.rs b/crates/channel/src/channel_store.rs index 8379a92aaeee2cae20737364ba5b47e8896a5c0f..c1e6d802dd626befe3e4714e35675e4f262cdc8e 100644 --- a/crates/channel/src/channel_store.rs +++ b/crates/channel/src/channel_store.rs @@ -1131,9 +1131,10 @@ impl ChannelState { if let Some(latest_version) = &self.latest_notes_versions { if let Some(observed_version) = &self.observed_notes_versions { latest_version.epoch > observed_version.epoch - || latest_version - .version - .changed_since(&observed_version.version) + || (latest_version.epoch == observed_version.epoch + && latest_version + .version + .changed_since(&observed_version.version)) } else { true } diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 6f6a95ebc2596b99041a6638e2027a7ec2cb5c13..6ccea25a600b96c22e4bee351f7cf13a794b6ad2 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -587,6 +587,9 @@ pub struct ChannelsForUser { pub channels: Vec, pub channel_memberships: Vec, pub channel_participants: HashMap>, + + pub observed_buffer_versions: Vec, + pub observed_channel_messages: Vec, pub latest_buffer_versions: Vec, pub latest_channel_messages: Vec, } diff --git a/crates/collab/src/db/queries/buffers.rs b/crates/collab/src/db/queries/buffers.rs index b42e26e4654c1591d8b9d567b8b4d2c5081b64fe..e814ea42a4d008cc44af9ee1407559698fcc86a2 100644 --- a/crates/collab/src/db/queries/buffers.rs +++ b/crates/collab/src/db/queries/buffers.rs @@ -561,7 +561,6 @@ impl Database { tx: &DatabaseTransaction, ) -> Result<()> { use observed_buffer_edits::Column; - observed_buffer_edits::Entity::insert(observed_buffer_edits::ActiveModel { user_id: ActiveValue::Set(user_id), buffer_id: ActiveValue::Set(buffer_id), @@ -671,7 +670,7 @@ impl Database { buffer_id: row.buffer_id, epoch: row.epoch, lamport_timestamp: row.lamport_timestamp, - replica_id: row.lamport_timestamp, + replica_id: row.replica_id, value: Default::default(), }); operations.push(proto::Operation { @@ -750,20 +749,9 @@ impl Database { pub async fn latest_channel_buffer_changes( &self, - channel_ids: &[ChannelId], + channel_ids_by_buffer_id: &HashMap, tx: &DatabaseTransaction, ) -> Result> { - let mut channel_ids_by_buffer_id = HashMap::default(); - let mut rows = buffer::Entity::find() - .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied())) - .stream(&*tx) - .await?; - while let Some(row) = rows.next().await { - let row = row?; - channel_ids_by_buffer_id.insert(row.id, row.channel_id); - } - drop(rows); - let latest_operations = self .get_latest_operations_for_buffers(channel_ids_by_buffer_id.keys().copied(), &*tx) .await?; @@ -783,6 +771,36 @@ impl Database { .collect()) } + pub async fn observed_channel_buffer_changes( + &self, + channel_ids_by_buffer_id: &HashMap, + user_id: UserId, + tx: &DatabaseTransaction, + ) -> Result> { + let observed_operations = observed_buffer_edits::Entity::find() + .filter(observed_buffer_edits::Column::UserId.eq(user_id)) + .filter( + observed_buffer_edits::Column::BufferId + .is_in(channel_ids_by_buffer_id.keys().copied()), + ) + .all(&*tx) + .await?; + + Ok(observed_operations + .iter() + .flat_map(|op| { + Some(proto::ChannelBufferVersion { + channel_id: channel_ids_by_buffer_id.get(&op.buffer_id)?.to_proto(), + epoch: op.epoch as u64, + version: vec![proto::VectorClockEntry { + replica_id: op.replica_id as u32, + timestamp: op.lamport_timestamp as u32, + }], + }) + }) + .collect()) + } + /// Returns the latest operations for the buffers with the specified IDs. pub async fn get_latest_operations_for_buffers( &self, diff --git a/crates/collab/src/db/queries/channels.rs b/crates/collab/src/db/queries/channels.rs index c37ea26fc154d28762a7a0557ef1aa62fb233714..7c88cd8aa03cae59ec4bb438b9a248d6dc16cb8b 100644 --- a/crates/collab/src/db/queries/channels.rs +++ b/crates/collab/src/db/queries/channels.rs @@ -673,18 +673,40 @@ impl Database { } let channel_ids = channels.iter().map(|c| c.id).collect::>(); + + let mut channel_ids_by_buffer_id = HashMap::default(); + let mut rows = buffer::Entity::find() + .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied())) + .stream(&*tx) + .await?; + while let Some(row) = rows.next().await { + let row = row?; + channel_ids_by_buffer_id.insert(row.id, row.channel_id); + } + drop(rows); + let latest_buffer_versions = self - .latest_channel_buffer_changes(&channel_ids, &*tx) + .latest_channel_buffer_changes(&channel_ids_by_buffer_id, &*tx) .await?; - let latest_messages = self.latest_channel_messages(&channel_ids, &*tx).await?; + let latest_channel_messages = self.latest_channel_messages(&channel_ids, &*tx).await?; + + let observed_buffer_versions = self + .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, &*tx) + .await?; + + let observed_channel_messages = self + .observed_channel_messages(&channel_ids, user_id, &*tx) + .await?; Ok(ChannelsForUser { channel_memberships, channels, channel_participants, latest_buffer_versions, - latest_channel_messages: latest_messages, + latest_channel_messages, + observed_buffer_versions, + observed_channel_messages, }) } diff --git a/crates/collab/src/db/queries/messages.rs b/crates/collab/src/db/queries/messages.rs index 38a828efa4c91e4311405c80ba614835ea6a8c11..d63b4cf1c5dd58afb101eb629934cf28a2c72284 100644 --- a/crates/collab/src/db/queries/messages.rs +++ b/crates/collab/src/db/queries/messages.rs @@ -388,6 +388,30 @@ impl Database { Ok(()) } + pub async fn observed_channel_messages( + &self, + channel_ids: &[ChannelId], + user_id: UserId, + tx: &DatabaseTransaction, + ) -> Result> { + let rows = observed_channel_messages::Entity::find() + .filter(observed_channel_messages::Column::UserId.eq(user_id)) + .filter( + observed_channel_messages::Column::ChannelId + .is_in(channel_ids.iter().map(|id| id.0)), + ) + .all(&*tx) + .await?; + + Ok(rows + .into_iter() + .map(|message| proto::ChannelMessageId { + channel_id: message.channel_id.to_proto(), + message_id: message.channel_message_id.to_proto(), + }) + .collect()) + } + pub async fn latest_channel_messages( &self, channel_ids: &[ChannelId], diff --git a/crates/collab/src/db/tests/buffer_tests.rs b/crates/collab/src/db/tests/buffer_tests.rs index de02c2b7b04b7b9d6711712de53d7cf9bd950d45..c1015330bb52128fc2532d4cf30ebdb4095c6bcd 100644 --- a/crates/collab/src/db/tests/buffer_tests.rs +++ b/crates/collab/src/db/tests/buffer_tests.rs @@ -337,17 +337,12 @@ async fn test_channel_buffers_last_operations(db: &Database) { let buffer_changes = db .transaction(|tx| { let buffers = &buffers; - async move { - db.latest_channel_buffer_changes( - &[ - buffers[0].channel_id, - buffers[1].channel_id, - buffers[2].channel_id, - ], - &*tx, - ) - .await - } + let mut hash = HashMap::default(); + hash.insert(buffers[0].id, buffers[0].channel_id); + hash.insert(buffers[1].id, buffers[1].channel_id); + hash.insert(buffers[2].id, buffers[2].channel_id); + + async move { db.latest_channel_buffer_changes(&hash, &*tx).await } }) .await .unwrap(); diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 01fd0c1a8af2231904fb8f912a11d4f43a5905df..38eff60ef03519a99c0d00f08afe1c5ba2f852b7 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -620,7 +620,7 @@ impl Server { let mut pool = this.connection_pool.lock(); pool.add_connection(connection_id, user_id, user.admin); this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?; - this.peer.send(connection_id, build_update_user_channels(&channels_for_user.channel_memberships))?; + this.peer.send(connection_id, build_update_user_channels(&channels_for_user))?; this.peer.send(connection_id, build_channels_update( channels_for_user, channel_invites @@ -3440,17 +3440,18 @@ fn notify_membership_updated( } } -fn build_update_user_channels( - memberships: &Vec, -) -> proto::UpdateUserChannels { +fn build_update_user_channels(channels: &ChannelsForUser) -> proto::UpdateUserChannels { proto::UpdateUserChannels { - channel_memberships: memberships + channel_memberships: channels + .channel_memberships .iter() .map(|m| proto::ChannelMembership { channel_id: m.channel_id.to_proto(), role: m.role.into(), }) .collect(), + observed_channel_buffer_version: channels.observed_buffer_versions.clone(), + observed_channel_message_id: channels.observed_channel_messages.clone(), ..Default::default() } } diff --git a/crates/collab/src/tests/channel_buffer_tests.rs b/crates/collab/src/tests/channel_buffer_tests.rs index 0ba70246839580bc5524504189c7fdacaf18541b..5f25468530209c386d3eab411ade5b8c56728bf9 100644 --- a/crates/collab/src/tests/channel_buffer_tests.rs +++ b/crates/collab/src/tests/channel_buffer_tests.rs @@ -1,6 +1,6 @@ use crate::{ rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT}, - tests::TestServer, + tests::{test_server::open_channel_notes, TestServer}, }; use call::ActiveCall; use channel::ACKNOWLEDGE_DEBOUNCE_INTERVAL; @@ -605,113 +605,75 @@ async fn test_channel_buffer_changes( cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, ) { - let mut server = TestServer::start(deterministic.clone()).await; - let client_a = server.create_client(cx_a, "user_a").await; - let client_b = server.create_client(cx_b, "user_b").await; - - let channel_id = server - .make_channel( - "the-channel", - None, - (&client_a, cx_a), - &mut [(&client_b, cx_b)], - ) - .await; + let (server, client_a, client_b, channel_id) = TestServer::start2(cx_a, cx_b).await; + let (_, cx_a) = client_a.build_test_workspace(cx_a).await; + let (workspace_b, cx_b) = client_b.build_test_workspace(cx_b).await; + let channel_store_b = client_b.channel_store().clone(); - let channel_buffer_a = client_a - .channel_store() - .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx)) - .await - .unwrap(); - - // Client A makes an edit, and client B should see that the note has changed. - channel_buffer_a.update(cx_a, |buffer, cx| { - buffer.buffer().update(cx, |buffer, cx| { - buffer.edit([(0..0, "1")], None, cx); - }) - }); - deterministic.run_until_parked(); - - let has_buffer_changed = cx_b.update(|cx| { - client_b - .channel_store() - .read(cx) - .has_channel_buffer_changed(channel_id) + // Editing the channel notes should set them to dirty + open_channel_notes(channel_id, cx_a).await.unwrap(); + cx_a.simulate_keystrokes("1"); + channel_store_b.read_with(cx_b, |channel_store, _| { + assert!(channel_store.has_channel_buffer_changed(channel_id)) }); - assert!(has_buffer_changed); // Opening the buffer should clear the changed flag. - let project_b = client_b.build_empty_local_project(cx_b); - let (workspace_b, cx_b) = client_b.build_workspace(&project_b, cx_b); - let channel_view_b = cx_b - .update(|cx| ChannelView::open(channel_id, None, workspace_b.clone(), cx)) - .await - .unwrap(); - deterministic.run_until_parked(); - - let has_buffer_changed = cx_b.update(|cx| { - client_b - .channel_store() - .read(cx) - .has_channel_buffer_changed(channel_id) + open_channel_notes(channel_id, cx_b).await.unwrap(); + channel_store_b.read_with(cx_b, |channel_store, _| { + assert!(!channel_store.has_channel_buffer_changed(channel_id)) }); - assert!(!has_buffer_changed); // Editing the channel while the buffer is open should not show that the buffer has changed. - channel_buffer_a.update(cx_a, |buffer, cx| { - buffer.buffer().update(cx, |buffer, cx| { - buffer.edit([(0..0, "2")], None, cx); - }) - }); - deterministic.run_until_parked(); - - let has_buffer_changed = cx_b.read(|cx| { - client_b - .channel_store() - .read(cx) - .has_channel_buffer_changed(channel_id) + cx_a.simulate_keystrokes("2"); + channel_store_b.read_with(cx_b, |channel_store, _| { + assert!(!channel_store.has_channel_buffer_changed(channel_id)) }); - assert!(!has_buffer_changed); - - deterministic.advance_clock(ACKNOWLEDGE_DEBOUNCE_INTERVAL); // Test that the server is tracking things correctly, and we retain our 'not changed' // state across a disconnect + deterministic.advance_clock(ACKNOWLEDGE_DEBOUNCE_INTERVAL); server .simulate_long_connection_interruption(client_b.peer_id().unwrap(), deterministic.clone()); - let has_buffer_changed = cx_b.read(|cx| { - client_b - .channel_store() - .read(cx) - .has_channel_buffer_changed(channel_id) + channel_store_b.read_with(cx_b, |channel_store, _| { + assert!(!channel_store.has_channel_buffer_changed(channel_id)) }); - assert!(!has_buffer_changed); // Closing the buffer should re-enable change tracking cx_b.update(|cx| { workspace_b.update(cx, |workspace, cx| { workspace.close_all_items_and_panes(&Default::default(), cx) }); - - drop(channel_view_b) }); - deterministic.run_until_parked(); - channel_buffer_a.update(cx_a, |buffer, cx| { - buffer.buffer().update(cx, |buffer, cx| { - buffer.edit([(0..0, "3")], None, cx); - }) + cx_a.simulate_keystrokes("3"); + channel_store_b.read_with(cx_b, |channel_store, _| { + assert!(channel_store.has_channel_buffer_changed(channel_id)) }); - deterministic.run_until_parked(); +} - let has_buffer_changed = cx_b.read(|cx| { - client_b - .channel_store() - .read(cx) - .has_channel_buffer_changed(channel_id) +#[gpui::test] +async fn test_channel_buffer_changes_persist( + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, + cx_b2: &mut TestAppContext, +) { + let (mut server, client_a, client_b, channel_id) = TestServer::start2(cx_a, cx_b).await; + let (_, cx_a) = client_a.build_test_workspace(cx_a).await; + let (_, cx_b) = client_b.build_test_workspace(cx_b).await; + + // a) edits the notes + open_channel_notes(channel_id, cx_a).await.unwrap(); + cx_a.simulate_keystrokes("1"); + // b) opens them to observe the current version + open_channel_notes(channel_id, cx_b).await.unwrap(); + + // On boot the client should get the correct state. + let client_b2 = server.create_client(cx_b2, "user_b").await; + let channel_store_b2 = client_b2.channel_store().clone(); + channel_store_b2.read_with(cx_b2, |channel_store, _| { + assert!(!channel_store.has_channel_buffer_changed(channel_id)) }); - assert!(has_buffer_changed); } #[track_caller] diff --git a/crates/collab/src/tests/test_server.rs b/crates/collab/src/tests/test_server.rs index d7e19dc9a9a6452036030012905b4c4a4b4b8108..e4bf377668c58e20df81c9ea555ce035b9a457f6 100644 --- a/crates/collab/src/tests/test_server.rs +++ b/crates/collab/src/tests/test_server.rs @@ -10,6 +10,7 @@ use channel::{ChannelBuffer, ChannelStore}; use client::{ self, proto::PeerId, Client, Connection, Credentials, EstablishConnectionError, UserStore, }; +use collab_ui::channel_view::ChannelView; use collections::{HashMap, HashSet}; use fs::FakeFs; use futures::{channel::oneshot, StreamExt as _}; @@ -766,6 +767,16 @@ pub fn join_channel_call(cx: &mut TestAppContext) -> Task> { room.unwrap().update(cx, |room, cx| room.join_call(cx)) } +pub fn open_channel_notes( + channel_id: u64, + cx: &mut VisualTestContext, +) -> Task>> { + let window = cx.update(|cx| cx.active_window().unwrap().downcast::().unwrap()); + let view = window.root_view(cx).unwrap(); + + cx.update(|cx| ChannelView::open(channel_id, None, view.clone(), cx)) +} + impl Drop for TestClient { fn drop(&mut self) { self.app_state.client.teardown();